import os
from datetime import datetime
from typing import List, Tuple
+from fastapi.params import File
import whoosh
from whoosh import writing
from whoosh.searching import Hit
+class FilenameContainsPathError(Exception):
+ def __init__(self, message="Specified filename contains path information"):
+ self.message = message
+ super().__init__(self.message)
+
+
class IndexSchema(SchemaClass):
- filepath = ID(unique=True, stored=True)
+ filename = ID(unique=True, stored=True)
last_modified = STORED()
title = TEXT(field_boost=2)
content = TEXT()
class Note:
- def __init__(self, filepath: str, new: bool = False) -> None:
- if new and os.path.exists(filepath):
+ def __init__(
+ self, flatnotes: "Flatnotes", filename: str, new: bool = False
+ ) -> None:
+ if not self._is_path_safe(filename):
+ raise FilenameContainsPathError
+ self._flatnotes = flatnotes
+ self._filename = filename
+ if new and os.path.exists(self.filepath):
raise FileExistsError
elif new:
- open(filepath, "w").close()
- self._filepath = filepath
+ open(self.filepath, "w").close()
@property
def filepath(self):
- return self._filepath
-
- @property
- def dirpath(self):
- return os.path.split(self._filepath)[0]
+ return os.path.join(self._flatnotes.dir, self._filename)
@property
def title(self):
- return os.path.splitext(self.filename)[0]
+ return os.path.splitext(self._filename)[0]
@property
def last_modified(self):
- return os.path.getmtime(self._filepath)
+ return os.path.getmtime(self.filepath)
# Editable Properties
@property
def filename(self):
- return os.path.split(self._filepath)[1]
+ return self._filename
@filename.setter
def filename(self, new_filename):
- new_filepath = os.path.join(self.dirpath, new_filename)
- os.rename(self._filepath, new_filepath)
- self._filepath = new_filepath
+ if not self._is_path_safe(new_filename):
+ raise FilenameContainsPathError
+ new_filepath = os.path.join(self._flatnotes.dir, new_filename)
+ os.rename(self.filepath, new_filepath)
+ self._filename = new_filename
@property
def content(self):
- with open(self._filepath, "r") as f:
+ with open(self.filepath, "r") as f:
return f.read()
@content.setter
def content(self, new_content):
- if not os.path.exists(self._filepath):
+ if not os.path.exists(self.filepath):
raise FileNotFoundError
- with open(self._filepath, "w") as f:
+ with open(self.filepath, "w") as f:
f.write(new_content)
def delete(self):
- os.remove(self._filepath)
+ os.remove(self.filepath)
+
+ # Functions
+ def _is_path_safe(self, filename: str) -> bool:
+ """Return False if the declared filename contains path
+ information e.g. '../note.md' or 'folder/note.md'."""
+ return os.path.split(filename)[0] == ""
class NoteHit(Note):
- def __init__(self, hit: Hit) -> None:
- self.filepath = hit["filepath"]
+ def __init__(self, flatnotes: "Flatnotes", hit: Hit) -> None:
+ super().__init__(flatnotes, hit["filename"])
self.title_highlights = hit.highlights("title", text=self.title)
self.content_highlights = hit.highlights(
"content",
class Flatnotes(object):
- def __init__(self, notes_dirpath: str) -> None:
- if not os.path.exists(notes_dirpath):
- raise NotADirectoryError(
- f"'{notes_dirpath}' is not a valid directory."
- )
- self.notes_dirpath = notes_dirpath
+ def __init__(self, dir: str) -> None:
+ if not os.path.exists(dir):
+ raise NotADirectoryError(f"'{dir}' is not a valid directory.")
+ self.dir = dir
self.index = self._load_index()
self.last_index_update = None
self.update_index()
@property
- def index_dirpath(self):
- return os.path.join(self.notes_dirpath, ".flatnotes")
+ def index_dir(self):
+ return os.path.join(self.dir, ".flatnotes")
def _load_index(self) -> Index:
"""Load the note index or create new if not exists."""
- if not os.path.exists(self.index_dirpath):
- os.mkdir(self.index_dirpath)
- if whoosh.index.exists_in(self.index_dirpath):
+ if not os.path.exists(self.index_dir):
+ os.mkdir(self.index_dir)
+ if whoosh.index.exists_in(self.index_dir):
logging.info("Existing index loaded")
- return whoosh.index.open_dir(self.index_dirpath)
+ return whoosh.index.open_dir(self.index_dir)
else:
logging.info("New index created")
- return whoosh.index.create_in(self.index_dirpath, IndexSchema)
+ return whoosh.index.create_in(self.index_dir, IndexSchema)
def _add_note_to_index(
self, writer: writing.IndexWriter, note: Note
"""Add a Note object to the index using the given writer. If the
filepath already exists in the index an update will be performed instead."""
writer.update_document(
- filepath=note.filepath,
+ filename=note.filename,
last_modified=note.last_modified,
title=note.title,
content=note.content,
def get_notes(self) -> List[Note]:
"""Return a list containing a Note object for every file in the notes directory."""
return [
- Note(filepath)
- for filepath in glob.glob(os.path.join(self.notes_dirpath, "*.md"))
+ Note(self, os.path.split(filepath)[1])
+ for filepath in glob.glob(os.path.join(self.dir, "*.md"))
]
def update_index(self, clean: bool = False) -> None:
writer.mergetype = writing.CLEAR # Clear the index
with self.index.searcher() as searcher:
for idx_note in searcher.all_stored_fields():
- idx_filepath = idx_note["filepath"]
+ idx_filename = idx_note["filename"]
+ idx_filepath = os.path.join(self.dir, idx_filename)
# Delete missing
if not os.path.exists(idx_filepath):
- writer.delete_by_term("filepath", idx_filepath)
- logging.debug(f"{idx_filepath} removed from index")
+ writer.delete_by_term("filename", idx_filename)
+ logging.debug(f"'{idx_filename}' removed from index")
# Update modified
elif (
os.path.getmtime(idx_filepath) != idx_note["last_modified"]
):
- logging.debug(f"{idx_filepath} updated")
- self._add_note_to_index(writer, Note(idx_filepath))
- indexed.add(idx_filepath)
+ logging.debug(f"'{idx_filename}' updated")
+ self._add_note_to_index(writer, Note(self, idx_filename))
+ indexed.add(idx_filename)
# Ignore already indexed
else:
- indexed.add(idx_filepath)
+ indexed.add(idx_filename)
# Add new
for note in self.get_notes():
- if note.filepath not in indexed:
+ if note.filename not in indexed:
self._add_note_to_index(writer, note)
- logging.debug(f"{note.filepath} added to index")
+ logging.debug(f"'{note.filename}' added to index")
writer.commit()
self.last_index_update = datetime.now()
["title", "content"], self.index.schema
).parse(term)
results = searcher.search(query)
- return tuple(NoteHit(result) for result in results)
+ return tuple(NoteHit(self, hit) for hit in results)
from typing import Dict, List, Optional
from fastapi import FastAPI
-from fastapi.responses import JSONResponse, RedirectResponse
+from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles
-from flatnotes import Flatnotes, Note, NoteHit
-from helpers import CamelCaseBaseModel, is_path_safe
+from error_responses import (
+ file_exists_response,
+ file_not_found_response,
+ filename_contains_path_response,
+)
+from flatnotes import FilenameContainsPathError, Flatnotes, Note, NoteHit
+from helpers import CamelCaseBaseModel
logging.basicConfig(
format="%(asctime)s [%(levelname)s]: %(message)s",
@app.post("/api/notes", response_model=NoteModel)
async def post_note(filename: str, content: str):
"""Create a new note."""
- if not is_path_safe(filename):
- return JSONResponse(status_code=404) # TODO: Different code
- note = Note(os.path.join(flatnotes.notes_dirpath, filename), new=True)
- note.content = content
- # TODO: Handle file exists
- return NoteModel.dump(note, include_content=True)
+ try:
+ note = Note(flatnotes, filename, new=True)
+ note.content = content
+ return NoteModel.dump(note, include_content=True)
+ except FilenameContainsPathError:
+ return filename_contains_path_response
+ except FileExistsError:
+ return file_exists_response
@app.get("/api/notes/{filename}", response_model=NoteModel)
async def get_note(filename: str, include_content: bool = True):
"""Get a specific note."""
- if not is_path_safe(filename):
- return JSONResponse(status_code=404)
- note = Note(os.path.join(flatnotes.notes_dirpath, filename))
try:
+ note = Note(flatnotes, filename)
return NoteModel.dump(note, include_content=include_content)
+ except FilenameContainsPathError:
+ return filename_contains_path_response
except FileNotFoundError:
- return JSONResponse(status_code=404)
+ return file_not_found_response
@app.patch("/api/notes/{filename}", response_model=NoteModel)
async def patch_note(
filename: str, new_filename: str = None, new_content: str = None
):
- if not is_path_safe(filename):
- return JSONResponse(status_code=404)
- note = Note(
- os.path.join(flatnotes.notes_dirpath, filename)
- ) # TODO: Stop repeating this
- if new_filename is not None:
- note.filename = new_filename
- if new_content is not None:
- note.content = new_content
- return NoteModel.dump(note, include_content=True)
+ try:
+ note = Note(flatnotes, filename)
+ if new_filename is not None:
+ note.filename = new_filename
+ if new_content is not None:
+ note.content = new_content
+ return NoteModel.dump(note, include_content=True)
+ except FilenameContainsPathError:
+ return filename_contains_path_response
+ except FileNotFoundError:
+ return file_not_found_response
@app.delete("/api/notes/{filename}")
async def delete_note(filename: str):
- if not is_path_safe(filename):
- return JSONResponse(status_code=404)
- note = Note(os.path.join(flatnotes.notes_dirpath, filename))
- note.delete()
+ try:
+ note = Note(flatnotes, filename)
+ note.delete()
+ except FilenameContainsPathError:
+ return filename_contains_path_response
+ except FileNotFoundError:
+ return file_not_found_response
@app.get("/api/search", response_model=List[NoteHitModel])