flatnotes Refactor
authorAdam Dullage <redacted>
Mon, 9 Aug 2021 12:39:19 +0000 (13:39 +0100)
committerAdam Dullage <redacted>
Mon, 9 Aug 2021 12:39:19 +0000 (13:39 +0100)
flatnotes/error_responses.py [new file with mode: 0644]
flatnotes/flatnotes.py
flatnotes/helpers.py
flatnotes/main.py

diff --git a/flatnotes/error_responses.py b/flatnotes/error_responses.py
new file mode 100644 (file)
index 0000000..1363dcc
--- /dev/null
@@ -0,0 +1,17 @@
+from fastapi.responses import JSONResponse
+
+file_exists_response = JSONResponse(
+    content={"message": "The specified filename already exists."},
+    status_code=409,
+)
+
+filename_contains_path_response = JSONResponse(
+    content={
+        "message": "The specified filename contains path information which is forbidden."
+    },
+    status_code=403,
+)
+
+file_not_found_response = JSONResponse(
+    content={"message": "The specified file cannot be found."}, status_code=404
+)
index 02091b73fe6bdfd39c9a260989c0cbb085773c7b..54c7df9ab3b21a504330c75908b9acf62253f119 100644 (file)
@@ -3,6 +3,7 @@ import logging
 import os
 from datetime import datetime
 from typing import List, Tuple
+from fastapi.params import File
 
 import whoosh
 from whoosh import writing
@@ -12,67 +13,82 @@ from whoosh.qparser import MultifieldParser
 from whoosh.searching import Hit
 
 
+class FilenameContainsPathError(Exception):
+    def __init__(self, message="Specified filename contains path information"):
+        self.message = message
+        super().__init__(self.message)
+
+
 class IndexSchema(SchemaClass):
-    filepath = ID(unique=True, stored=True)
+    filename = ID(unique=True, stored=True)
     last_modified = STORED()
     title = TEXT(field_boost=2)
     content = TEXT()
 
 
 class Note:
-    def __init__(self, filepath: str, new: bool = False) -> None:
-        if new and os.path.exists(filepath):
+    def __init__(
+        self, flatnotes: "Flatnotes", filename: str, new: bool = False
+    ) -> None:
+        if not self._is_path_safe(filename):
+            raise FilenameContainsPathError
+        self._flatnotes = flatnotes
+        self._filename = filename
+        if new and os.path.exists(self.filepath):
             raise FileExistsError
         elif new:
-            open(filepath, "w").close()
-        self._filepath = filepath
+            open(self.filepath, "w").close()
 
     @property
     def filepath(self):
-        return self._filepath
-
-    @property
-    def dirpath(self):
-        return os.path.split(self._filepath)[0]
+        return os.path.join(self._flatnotes.dir, self._filename)
 
     @property
     def title(self):
-        return os.path.splitext(self.filename)[0]
+        return os.path.splitext(self._filename)[0]
 
     @property
     def last_modified(self):
-        return os.path.getmtime(self._filepath)
+        return os.path.getmtime(self.filepath)
 
     # Editable Properties
     @property
     def filename(self):
-        return os.path.split(self._filepath)[1]
+        return self._filename
 
     @filename.setter
     def filename(self, new_filename):
-        new_filepath = os.path.join(self.dirpath, new_filename)
-        os.rename(self._filepath, new_filepath)
-        self._filepath = new_filepath
+        if not self._is_path_safe(new_filename):
+            raise FilenameContainsPathError
+        new_filepath = os.path.join(self._flatnotes.dir, new_filename)
+        os.rename(self.filepath, new_filepath)
+        self._filename = new_filename
 
     @property
     def content(self):
-        with open(self._filepath, "r") as f:
+        with open(self.filepath, "r") as f:
             return f.read()
 
     @content.setter
     def content(self, new_content):
-        if not os.path.exists(self._filepath):
+        if not os.path.exists(self.filepath):
             raise FileNotFoundError
-        with open(self._filepath, "w") as f:
+        with open(self.filepath, "w") as f:
             f.write(new_content)
 
     def delete(self):
-        os.remove(self._filepath)
+        os.remove(self.filepath)
+
+    # Functions
+    def _is_path_safe(self, filename: str) -> bool:
+        """Return False if the declared filename contains path
+        information e.g. '../note.md' or 'folder/note.md'."""
+        return os.path.split(filename)[0] == ""
 
 
 class NoteHit(Note):
-    def __init__(self, hit: Hit) -> None:
-        self.filepath = hit["filepath"]
+    def __init__(self, flatnotes: "Flatnotes", hit: Hit) -> None:
+        super().__init__(flatnotes, hit["filename"])
         self.title_highlights = hit.highlights("title", text=self.title)
         self.content_highlights = hit.highlights(
             "content",
@@ -81,31 +97,29 @@ class NoteHit(Note):
 
 
 class Flatnotes(object):
-    def __init__(self, notes_dirpath: str) -> None:
-        if not os.path.exists(notes_dirpath):
-            raise NotADirectoryError(
-                f"'{notes_dirpath}' is not a valid directory."
-            )
-        self.notes_dirpath = notes_dirpath
+    def __init__(self, dir: str) -> None:
+        if not os.path.exists(dir):
+            raise NotADirectoryError(f"'{dir}' is not a valid directory.")
+        self.dir = dir
 
         self.index = self._load_index()
         self.last_index_update = None
         self.update_index()
 
     @property
-    def index_dirpath(self):
-        return os.path.join(self.notes_dirpath, ".flatnotes")
+    def index_dir(self):
+        return os.path.join(self.dir, ".flatnotes")
 
     def _load_index(self) -> Index:
         """Load the note index or create new if not exists."""
-        if not os.path.exists(self.index_dirpath):
-            os.mkdir(self.index_dirpath)
-        if whoosh.index.exists_in(self.index_dirpath):
+        if not os.path.exists(self.index_dir):
+            os.mkdir(self.index_dir)
+        if whoosh.index.exists_in(self.index_dir):
             logging.info("Existing index loaded")
-            return whoosh.index.open_dir(self.index_dirpath)
+            return whoosh.index.open_dir(self.index_dir)
         else:
             logging.info("New index created")
-            return whoosh.index.create_in(self.index_dirpath, IndexSchema)
+            return whoosh.index.create_in(self.index_dir, IndexSchema)
 
     def _add_note_to_index(
         self, writer: writing.IndexWriter, note: Note
@@ -113,7 +127,7 @@ class Flatnotes(object):
         """Add a Note object to the index using the given writer. If the
         filepath already exists in the index an update will be performed instead."""
         writer.update_document(
-            filepath=note.filepath,
+            filename=note.filename,
             last_modified=note.last_modified,
             title=note.title,
             content=note.content,
@@ -122,8 +136,8 @@ class Flatnotes(object):
     def get_notes(self) -> List[Note]:
         """Return a list containing a Note object for every file in the notes directory."""
         return [
-            Note(filepath)
-            for filepath in glob.glob(os.path.join(self.notes_dirpath, "*.md"))
+            Note(self, os.path.split(filepath)[1])
+            for filepath in glob.glob(os.path.join(self.dir, "*.md"))
         ]
 
     def update_index(self, clean: bool = False) -> None:
@@ -135,26 +149,27 @@ class Flatnotes(object):
             writer.mergetype = writing.CLEAR  # Clear the index
         with self.index.searcher() as searcher:
             for idx_note in searcher.all_stored_fields():
-                idx_filepath = idx_note["filepath"]
+                idx_filename = idx_note["filename"]
+                idx_filepath = os.path.join(self.dir, idx_filename)
                 # Delete missing
                 if not os.path.exists(idx_filepath):
-                    writer.delete_by_term("filepath", idx_filepath)
-                    logging.debug(f"{idx_filepath} removed from index")
+                    writer.delete_by_term("filename", idx_filename)
+                    logging.debug(f"'{idx_filename}' removed from index")
                 # Update modified
                 elif (
                     os.path.getmtime(idx_filepath) != idx_note["last_modified"]
                 ):
-                    logging.debug(f"{idx_filepath} updated")
-                    self._add_note_to_index(writer, Note(idx_filepath))
-                    indexed.add(idx_filepath)
+                    logging.debug(f"'{idx_filename}' updated")
+                    self._add_note_to_index(writer, Note(self, idx_filename))
+                    indexed.add(idx_filename)
                 # Ignore already indexed
                 else:
-                    indexed.add(idx_filepath)
+                    indexed.add(idx_filename)
         # Add new
         for note in self.get_notes():
-            if note.filepath not in indexed:
+            if note.filename not in indexed:
                 self._add_note_to_index(writer, note)
-                logging.debug(f"{note.filepath} added to index")
+                logging.debug(f"'{note.filename}' added to index")
         writer.commit()
         self.last_index_update = datetime.now()
 
@@ -174,4 +189,4 @@ class Flatnotes(object):
                 ["title", "content"], self.index.schema
             ).parse(term)
             results = searcher.search(query)
-            return tuple(NoteHit(result) for result in results)
+            return tuple(NoteHit(self, hit) for hit in results)
index 9964cc879c3bd58c8cbc88b5b3f137a2defc9394..25750bc9c594139cd2ed0d8bcd8d3bcf3ec6dcac 100644 (file)
@@ -2,12 +2,6 @@ import os
 from pydantic import BaseModel
 
 
-def is_path_safe(filename: str) -> bool:
-    """Return False if the declared filename contains path
-    information e.g. '../note.md' or 'folder/note.md'."""
-    return os.path.split(filename)[0] == ""
-
-
 def camel_case(snake_case_str: str) -> str:
     """Return the declared snake_case string in camelCase."""
     parts = [part for part in snake_case_str.split("_") if part != ""]
index 58267e16d0779dcfe3bd1d0986a61462ac1250fc..f6f7b08ad88dfc4cd524eb61e2d72f77b9682f26 100644 (file)
@@ -3,11 +3,16 @@ import os
 from typing import Dict, List, Optional
 
 from fastapi import FastAPI
-from fastapi.responses import JSONResponse, RedirectResponse
+from fastapi.responses import RedirectResponse
 from fastapi.staticfiles import StaticFiles
 
-from flatnotes import Flatnotes, Note, NoteHit
-from helpers import CamelCaseBaseModel, is_path_safe
+from error_responses import (
+    file_exists_response,
+    file_not_found_response,
+    filename_contains_path_response,
+)
+from flatnotes import FilenameContainsPathError, Flatnotes, Note, NoteHit
+from helpers import CamelCaseBaseModel
 
 logging.basicConfig(
     format="%(asctime)s [%(levelname)s]: %(message)s",
@@ -68,48 +73,54 @@ async def get_notes(include_content: bool = False):
 @app.post("/api/notes", response_model=NoteModel)
 async def post_note(filename: str, content: str):
     """Create a new note."""
-    if not is_path_safe(filename):
-        return JSONResponse(status_code=404)  # TODO: Different code
-    note = Note(os.path.join(flatnotes.notes_dirpath, filename), new=True)
-    note.content = content
-    # TODO: Handle file exists
-    return NoteModel.dump(note, include_content=True)
+    try:
+        note = Note(flatnotes, filename, new=True)
+        note.content = content
+        return NoteModel.dump(note, include_content=True)
+    except FilenameContainsPathError:
+        return filename_contains_path_response
+    except FileExistsError:
+        return file_exists_response
 
 
 @app.get("/api/notes/{filename}", response_model=NoteModel)
 async def get_note(filename: str, include_content: bool = True):
     """Get a specific note."""
-    if not is_path_safe(filename):
-        return JSONResponse(status_code=404)
-    note = Note(os.path.join(flatnotes.notes_dirpath, filename))
     try:
+        note = Note(flatnotes, filename)
         return NoteModel.dump(note, include_content=include_content)
+    except FilenameContainsPathError:
+        return filename_contains_path_response
     except FileNotFoundError:
-        return JSONResponse(status_code=404)
+        return file_not_found_response
 
 
 @app.patch("/api/notes/{filename}", response_model=NoteModel)
 async def patch_note(
     filename: str, new_filename: str = None, new_content: str = None
 ):
-    if not is_path_safe(filename):
-            return JSONResponse(status_code=404)
-    note = Note(
-        os.path.join(flatnotes.notes_dirpath, filename)
-    )  # TODO: Stop repeating this
-    if new_filename is not None:
-        note.filename = new_filename
-    if new_content is not None:
-        note.content = new_content
-    return NoteModel.dump(note, include_content=True)
+    try:
+        note = Note(flatnotes, filename)
+        if new_filename is not None:
+            note.filename = new_filename
+        if new_content is not None:
+            note.content = new_content
+        return NoteModel.dump(note, include_content=True)
+    except FilenameContainsPathError:
+        return filename_contains_path_response
+    except FileNotFoundError:
+        return file_not_found_response
 
 
 @app.delete("/api/notes/{filename}")
 async def delete_note(filename: str):
-    if not is_path_safe(filename):
-            return JSONResponse(status_code=404)
-    note = Note(os.path.join(flatnotes.notes_dirpath, filename))
-    note.delete()
+    try:
+        note = Note(flatnotes, filename)
+        note.delete()
+    except FilenameContainsPathError:
+        return filename_contains_path_response
+    except FileNotFoundError:
+        return file_not_found_response
 
 
 @app.get("/api/search", response_model=List[NoteHitModel])
git clone https://git.99rst.org/PROJECT