From: Adam Dullage Date: Mon, 9 Aug 2021 12:39:19 +0000 (+0100) Subject: flatnotes Refactor X-Git-Url: http://git.99rst.org/?a=commitdiff_plain;h=2bcd45dc88df01bc2f90e7fd3fd156f8288377b0;p=flatnotes.git flatnotes Refactor --- diff --git a/flatnotes/error_responses.py b/flatnotes/error_responses.py new file mode 100644 index 0000000..1363dcc --- /dev/null +++ b/flatnotes/error_responses.py @@ -0,0 +1,17 @@ +from fastapi.responses import JSONResponse + +file_exists_response = JSONResponse( + content={"message": "The specified filename already exists."}, + status_code=409, +) + +filename_contains_path_response = JSONResponse( + content={ + "message": "The specified filename contains path information which is forbidden." + }, + status_code=403, +) + +file_not_found_response = JSONResponse( + content={"message": "The specified file cannot be found."}, status_code=404 +) diff --git a/flatnotes/flatnotes.py b/flatnotes/flatnotes.py index 02091b7..54c7df9 100644 --- a/flatnotes/flatnotes.py +++ b/flatnotes/flatnotes.py @@ -3,6 +3,7 @@ import logging import os from datetime import datetime from typing import List, Tuple +from fastapi.params import File import whoosh from whoosh import writing @@ -12,67 +13,82 @@ from whoosh.qparser import MultifieldParser from whoosh.searching import Hit +class FilenameContainsPathError(Exception): + def __init__(self, message="Specified filename contains path information"): + self.message = message + super().__init__(self.message) + + class IndexSchema(SchemaClass): - filepath = ID(unique=True, stored=True) + filename = ID(unique=True, stored=True) last_modified = STORED() title = TEXT(field_boost=2) content = TEXT() class Note: - def __init__(self, filepath: str, new: bool = False) -> None: - if new and os.path.exists(filepath): + def __init__( + self, flatnotes: "Flatnotes", filename: str, new: bool = False + ) -> None: + if not self._is_path_safe(filename): + raise FilenameContainsPathError + self._flatnotes = flatnotes + self._filename = filename + if new and os.path.exists(self.filepath): raise FileExistsError elif new: - open(filepath, "w").close() - self._filepath = filepath + open(self.filepath, "w").close() @property def filepath(self): - return self._filepath - - @property - def dirpath(self): - return os.path.split(self._filepath)[0] + return os.path.join(self._flatnotes.dir, self._filename) @property def title(self): - return os.path.splitext(self.filename)[0] + return os.path.splitext(self._filename)[0] @property def last_modified(self): - return os.path.getmtime(self._filepath) + return os.path.getmtime(self.filepath) # Editable Properties @property def filename(self): - return os.path.split(self._filepath)[1] + return self._filename @filename.setter def filename(self, new_filename): - new_filepath = os.path.join(self.dirpath, new_filename) - os.rename(self._filepath, new_filepath) - self._filepath = new_filepath + if not self._is_path_safe(new_filename): + raise FilenameContainsPathError + new_filepath = os.path.join(self._flatnotes.dir, new_filename) + os.rename(self.filepath, new_filepath) + self._filename = new_filename @property def content(self): - with open(self._filepath, "r") as f: + with open(self.filepath, "r") as f: return f.read() @content.setter def content(self, new_content): - if not os.path.exists(self._filepath): + if not os.path.exists(self.filepath): raise FileNotFoundError - with open(self._filepath, "w") as f: + with open(self.filepath, "w") as f: f.write(new_content) def delete(self): - os.remove(self._filepath) + os.remove(self.filepath) + + # Functions + def _is_path_safe(self, filename: str) -> bool: + """Return False if the declared filename contains path + information e.g. '../note.md' or 'folder/note.md'.""" + return os.path.split(filename)[0] == "" class NoteHit(Note): - def __init__(self, hit: Hit) -> None: - self.filepath = hit["filepath"] + def __init__(self, flatnotes: "Flatnotes", hit: Hit) -> None: + super().__init__(flatnotes, hit["filename"]) self.title_highlights = hit.highlights("title", text=self.title) self.content_highlights = hit.highlights( "content", @@ -81,31 +97,29 @@ class NoteHit(Note): class Flatnotes(object): - def __init__(self, notes_dirpath: str) -> None: - if not os.path.exists(notes_dirpath): - raise NotADirectoryError( - f"'{notes_dirpath}' is not a valid directory." - ) - self.notes_dirpath = notes_dirpath + def __init__(self, dir: str) -> None: + if not os.path.exists(dir): + raise NotADirectoryError(f"'{dir}' is not a valid directory.") + self.dir = dir self.index = self._load_index() self.last_index_update = None self.update_index() @property - def index_dirpath(self): - return os.path.join(self.notes_dirpath, ".flatnotes") + def index_dir(self): + return os.path.join(self.dir, ".flatnotes") def _load_index(self) -> Index: """Load the note index or create new if not exists.""" - if not os.path.exists(self.index_dirpath): - os.mkdir(self.index_dirpath) - if whoosh.index.exists_in(self.index_dirpath): + if not os.path.exists(self.index_dir): + os.mkdir(self.index_dir) + if whoosh.index.exists_in(self.index_dir): logging.info("Existing index loaded") - return whoosh.index.open_dir(self.index_dirpath) + return whoosh.index.open_dir(self.index_dir) else: logging.info("New index created") - return whoosh.index.create_in(self.index_dirpath, IndexSchema) + return whoosh.index.create_in(self.index_dir, IndexSchema) def _add_note_to_index( self, writer: writing.IndexWriter, note: Note @@ -113,7 +127,7 @@ class Flatnotes(object): """Add a Note object to the index using the given writer. If the filepath already exists in the index an update will be performed instead.""" writer.update_document( - filepath=note.filepath, + filename=note.filename, last_modified=note.last_modified, title=note.title, content=note.content, @@ -122,8 +136,8 @@ class Flatnotes(object): def get_notes(self) -> List[Note]: """Return a list containing a Note object for every file in the notes directory.""" return [ - Note(filepath) - for filepath in glob.glob(os.path.join(self.notes_dirpath, "*.md")) + Note(self, os.path.split(filepath)[1]) + for filepath in glob.glob(os.path.join(self.dir, "*.md")) ] def update_index(self, clean: bool = False) -> None: @@ -135,26 +149,27 @@ class Flatnotes(object): writer.mergetype = writing.CLEAR # Clear the index with self.index.searcher() as searcher: for idx_note in searcher.all_stored_fields(): - idx_filepath = idx_note["filepath"] + idx_filename = idx_note["filename"] + idx_filepath = os.path.join(self.dir, idx_filename) # Delete missing if not os.path.exists(idx_filepath): - writer.delete_by_term("filepath", idx_filepath) - logging.debug(f"{idx_filepath} removed from index") + writer.delete_by_term("filename", idx_filename) + logging.debug(f"'{idx_filename}' removed from index") # Update modified elif ( os.path.getmtime(idx_filepath) != idx_note["last_modified"] ): - logging.debug(f"{idx_filepath} updated") - self._add_note_to_index(writer, Note(idx_filepath)) - indexed.add(idx_filepath) + logging.debug(f"'{idx_filename}' updated") + self._add_note_to_index(writer, Note(self, idx_filename)) + indexed.add(idx_filename) # Ignore already indexed else: - indexed.add(idx_filepath) + indexed.add(idx_filename) # Add new for note in self.get_notes(): - if note.filepath not in indexed: + if note.filename not in indexed: self._add_note_to_index(writer, note) - logging.debug(f"{note.filepath} added to index") + logging.debug(f"'{note.filename}' added to index") writer.commit() self.last_index_update = datetime.now() @@ -174,4 +189,4 @@ class Flatnotes(object): ["title", "content"], self.index.schema ).parse(term) results = searcher.search(query) - return tuple(NoteHit(result) for result in results) + return tuple(NoteHit(self, hit) for hit in results) diff --git a/flatnotes/helpers.py b/flatnotes/helpers.py index 9964cc8..25750bc 100644 --- a/flatnotes/helpers.py +++ b/flatnotes/helpers.py @@ -2,12 +2,6 @@ import os from pydantic import BaseModel -def is_path_safe(filename: str) -> bool: - """Return False if the declared filename contains path - information e.g. '../note.md' or 'folder/note.md'.""" - return os.path.split(filename)[0] == "" - - def camel_case(snake_case_str: str) -> str: """Return the declared snake_case string in camelCase.""" parts = [part for part in snake_case_str.split("_") if part != ""] diff --git a/flatnotes/main.py b/flatnotes/main.py index 58267e1..f6f7b08 100644 --- a/flatnotes/main.py +++ b/flatnotes/main.py @@ -3,11 +3,16 @@ import os from typing import Dict, List, Optional from fastapi import FastAPI -from fastapi.responses import JSONResponse, RedirectResponse +from fastapi.responses import RedirectResponse from fastapi.staticfiles import StaticFiles -from flatnotes import Flatnotes, Note, NoteHit -from helpers import CamelCaseBaseModel, is_path_safe +from error_responses import ( + file_exists_response, + file_not_found_response, + filename_contains_path_response, +) +from flatnotes import FilenameContainsPathError, Flatnotes, Note, NoteHit +from helpers import CamelCaseBaseModel logging.basicConfig( format="%(asctime)s [%(levelname)s]: %(message)s", @@ -68,48 +73,54 @@ async def get_notes(include_content: bool = False): @app.post("/api/notes", response_model=NoteModel) async def post_note(filename: str, content: str): """Create a new note.""" - if not is_path_safe(filename): - return JSONResponse(status_code=404) # TODO: Different code - note = Note(os.path.join(flatnotes.notes_dirpath, filename), new=True) - note.content = content - # TODO: Handle file exists - return NoteModel.dump(note, include_content=True) + try: + note = Note(flatnotes, filename, new=True) + note.content = content + return NoteModel.dump(note, include_content=True) + except FilenameContainsPathError: + return filename_contains_path_response + except FileExistsError: + return file_exists_response @app.get("/api/notes/{filename}", response_model=NoteModel) async def get_note(filename: str, include_content: bool = True): """Get a specific note.""" - if not is_path_safe(filename): - return JSONResponse(status_code=404) - note = Note(os.path.join(flatnotes.notes_dirpath, filename)) try: + note = Note(flatnotes, filename) return NoteModel.dump(note, include_content=include_content) + except FilenameContainsPathError: + return filename_contains_path_response except FileNotFoundError: - return JSONResponse(status_code=404) + return file_not_found_response @app.patch("/api/notes/{filename}", response_model=NoteModel) async def patch_note( filename: str, new_filename: str = None, new_content: str = None ): - if not is_path_safe(filename): - return JSONResponse(status_code=404) - note = Note( - os.path.join(flatnotes.notes_dirpath, filename) - ) # TODO: Stop repeating this - if new_filename is not None: - note.filename = new_filename - if new_content is not None: - note.content = new_content - return NoteModel.dump(note, include_content=True) + try: + note = Note(flatnotes, filename) + if new_filename is not None: + note.filename = new_filename + if new_content is not None: + note.content = new_content + return NoteModel.dump(note, include_content=True) + except FilenameContainsPathError: + return filename_contains_path_response + except FileNotFoundError: + return file_not_found_response @app.delete("/api/notes/{filename}") async def delete_note(filename: str): - if not is_path_safe(filename): - return JSONResponse(status_code=404) - note = Note(os.path.join(flatnotes.notes_dirpath, filename)) - note.delete() + try: + note = Note(flatnotes, filename) + note.delete() + except FilenameContainsPathError: + return filename_contains_path_response + except FileNotFoundError: + return file_not_found_response @app.get("/api/search", response_model=List[NoteHitModel])