rewrite entire backend, add uwu mode
This commit is contained in:
parent
eb3a60b6be
commit
c36537aa02
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,5 +1,4 @@
|
|||||||
*.pem
|
*.pem
|
||||||
*.py*
|
|
||||||
test.sh
|
test.sh
|
||||||
log
|
log
|
||||||
graph*
|
graph*
|
||||||
|
@ -24,3 +24,4 @@
|
|||||||
2024-07-27: improve image accessibility, update blog posts to be compliant with new style
|
2024-07-27: improve image accessibility, update blog posts to be compliant with new style
|
||||||
2024-07-28: add support for planned footer in all relevant html documents, simplify inclusion of other documents in a document using include script
|
2024-07-28: add support for planned footer in all relevant html documents, simplify inclusion of other documents in a document using include script
|
||||||
2024-08-05: add 88x31s, clean up css
|
2024-08-05: add 88x31s, clean up css
|
||||||
|
2024-08-06: rewrote entire backend, website now supports <a href='/?uwu=true>uwu mode</a>
|
||||||
|
9
src/lib/__init__.py
Normal file
9
src/lib/__init__.py
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
from .method import Method
|
||||||
|
from .path import Path
|
||||||
|
from .headers import Headers
|
||||||
|
from .body import Body
|
||||||
|
from .request import Request
|
||||||
|
from .response import Response
|
||||||
|
from .responsecodes import ResponseCode
|
||||||
|
from .server import serve
|
||||||
|
from .content import *
|
40
src/lib/body.py
Normal file
40
src/lib/body.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
from typing import Dict, Any
|
||||||
|
import json
|
||||||
|
from urllib.parse import parse_qs
|
||||||
|
from requests_toolbelt.multipart import decoder
|
||||||
|
|
||||||
|
class Body:
|
||||||
|
def __init__(self, content: bytes, content_type: str):
|
||||||
|
self.content = content
|
||||||
|
self.content_type = content_type
|
||||||
|
self.data = self.parse_body()
|
||||||
|
|
||||||
|
def parse_body(self) -> Dict[str, Any]:
|
||||||
|
if 'application/x-www-form-urlencoded' in self.content_type:
|
||||||
|
return self.parse_form_urlencoded()
|
||||||
|
elif 'application/json' in self.content_type:
|
||||||
|
return self.parse_json()
|
||||||
|
elif 'multipart/form-data' in self.content_type:
|
||||||
|
boundary = self.content_type.split('boundary=')[1]
|
||||||
|
return self.parse_multipart(boundary)
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def parse_form_urlencoded(self) -> Dict[str, Any]:
|
||||||
|
return {key: value[0] if len(value) == 1 else value for key, value in parse_qs(self.content.decode('utf-8')).items()}
|
||||||
|
|
||||||
|
def parse_json(self) -> Dict[str, Any]:
|
||||||
|
return json.loads(self.content.decode('utf-8'))
|
||||||
|
|
||||||
|
def parse_multipart(self, boundary: str) -> Dict[str, Any]:
|
||||||
|
multipart_data = decoder.MultipartDecoder(self.content, boundary)
|
||||||
|
fields = {}
|
||||||
|
for part in multipart_data.parts:
|
||||||
|
fields[part.headers['Content-Disposition'].split('=')[1].strip('"')] = part.text
|
||||||
|
return fields
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return str(self.data)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'Body({self.content=}, {self.content_type=}, {self.data=})'
|
83
src/lib/content.py
Normal file
83
src/lib/content.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
from typing import Dict, Tuple
|
||||||
|
import bleach
|
||||||
|
import mimetypes
|
||||||
|
import subprocess
|
||||||
|
import re
|
||||||
|
import os
|
||||||
|
from .response import Response
|
||||||
|
from .responsecodes import ResponseCode
|
||||||
|
|
||||||
|
env = os.environ.copy()
|
||||||
|
env["PATH"] = "./scripts/:" + env["PATH"]
|
||||||
|
|
||||||
|
def execute_bash_code(match: re.Match) -> str:
|
||||||
|
code = match.group(1)
|
||||||
|
result = subprocess.check_output(code, shell=True, executable='bash', env=env)
|
||||||
|
return result.decode().strip()
|
||||||
|
|
||||||
|
def parse(string: str) -> str:
|
||||||
|
return re.sub(r'\$\[(.*?)\]', execute_bash_code, string)
|
||||||
|
|
||||||
|
def parse_file(filename: str, args: Dict[str, str]={}) -> str:
|
||||||
|
with open(filename, 'r') as file:
|
||||||
|
data = file.read()
|
||||||
|
|
||||||
|
for k, v in args.items():
|
||||||
|
data = data.replace('{'+k+'}', str(v))
|
||||||
|
|
||||||
|
return parse(data)
|
||||||
|
|
||||||
|
def raw_file_contents(file_path: str) -> Tuple[Dict[str, str], bytes]:
|
||||||
|
mime_type, _ = mimetypes.guess_type('.' + file_path)
|
||||||
|
|
||||||
|
if not mime_type:
|
||||||
|
mime_type = 'text/plain'
|
||||||
|
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
data = f.read()
|
||||||
|
|
||||||
|
return {'Content-Type': mime_type}, data
|
||||||
|
|
||||||
|
|
||||||
|
def remove_html_tags(input_string: str) -> str:
|
||||||
|
cleaned_string = bleach.clean(input_string, tags=[], attributes={})
|
||||||
|
return cleaned_string
|
||||||
|
|
||||||
|
def error_page(code: int) -> Response:
|
||||||
|
type = ResponseCode(code)
|
||||||
|
print('error page called')
|
||||||
|
aoeu= Response(
|
||||||
|
type,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
parse(f'''
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<style>$[cat style.css]</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>{type.code}</h1>
|
||||||
|
<p>{type.message}</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
''').encode('utf-8')
|
||||||
|
)
|
||||||
|
print(aoeu)
|
||||||
|
return aoeu
|
||||||
|
|
||||||
|
def page(title, body):
|
||||||
|
return parse("""
|
||||||
|
<html>
|
||||||
|
<head>
|
||||||
|
<title>""" + title + """</title>
|
||||||
|
<style>$[cat style.css]</style>
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
$[include html/header.html]
|
||||||
|
<main>
|
||||||
|
""" + body + """
|
||||||
|
</main>
|
||||||
|
$[include html/footer.html]
|
||||||
|
<body>
|
||||||
|
</html>
|
||||||
|
""").encode('utf-8')
|
18
src/lib/headers.py
Normal file
18
src/lib/headers.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Headers:
|
||||||
|
headers: Dict[str, str]
|
||||||
|
|
||||||
|
def has(self, key: str) -> bool:
|
||||||
|
return key in self.headers.keys()
|
||||||
|
|
||||||
|
def get(self, key: str) -> str | None:
|
||||||
|
if self.has(key):
|
||||||
|
return self.headers[key]
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def add(self, key, value) -> None:
|
||||||
|
self.headers[key] = value
|
20
src/lib/method.py
Normal file
20
src/lib/method.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
class Method(Enum):
|
||||||
|
GET = "GET"
|
||||||
|
POST = "POST"
|
||||||
|
PUT = "PUT"
|
||||||
|
DELETE = "DELETE"
|
||||||
|
PATCH = "PATCH"
|
||||||
|
HEAD = "HEAD"
|
||||||
|
OPTIONS = "OPTIONS"
|
||||||
|
|
||||||
|
method: str
|
||||||
|
|
||||||
|
def __new__(cls, method):
|
||||||
|
obj = object.__new__(cls)
|
||||||
|
obj.method = method
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.method
|
90
src/lib/patchers.py
Normal file
90
src/lib/patchers.py
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
from .response import Response
|
||||||
|
|
||||||
|
from typing import Callable, List
|
||||||
|
|
||||||
|
import re
|
||||||
|
import random
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
type Patcher = Callable[[Response, 'Request'], Response]
|
||||||
|
|
||||||
|
def find_substring_in_lines(s, substring):
|
||||||
|
for line_index, line in enumerate(s.splitlines()):
|
||||||
|
position = line.find(substring)
|
||||||
|
if position != -1:
|
||||||
|
return line_index
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def extract_words_from_line(line):
|
||||||
|
clean_line = re.sub(r'<[^>]+>', '', line)
|
||||||
|
words = clean_line.split()
|
||||||
|
return words
|
||||||
|
|
||||||
|
def uwuify_text(text):
|
||||||
|
replacements = [
|
||||||
|
(r'r', 'w'),
|
||||||
|
(r'l', 'w'),
|
||||||
|
(r'R', 'W'),
|
||||||
|
(r'L', 'W'),
|
||||||
|
(r'no', 'nyo'),
|
||||||
|
(r'No', 'Nyo'),
|
||||||
|
(r'u', 'uwu'),
|
||||||
|
(r'U', 'Uwu')
|
||||||
|
]
|
||||||
|
|
||||||
|
for pattern, replacement in replacements:
|
||||||
|
text = re.sub(pattern, replacement, text)
|
||||||
|
|
||||||
|
expressions = [" owo", " UwU", " rawr", " >w<"]
|
||||||
|
sentences = text.split('. ')
|
||||||
|
uwuified_sentences = []
|
||||||
|
|
||||||
|
for sentence in sentences:
|
||||||
|
sentence = sentence.strip()
|
||||||
|
if sentence:
|
||||||
|
uwuified_sentences.append(sentence + (random.choice(expressions) if random.randint(0, 5) > 4 else ''))
|
||||||
|
|
||||||
|
return '. '.join(uwuified_sentences)
|
||||||
|
|
||||||
|
def uwuify(body):
|
||||||
|
body = body.decode('utf-8')
|
||||||
|
soup = BeautifulSoup(body, 'html.parser')
|
||||||
|
|
||||||
|
for text in soup.find_all(text=True):
|
||||||
|
if text.parent.name not in ['script', 'style']:
|
||||||
|
original_text = text.string
|
||||||
|
words = extract_words_from_line(original_text)
|
||||||
|
uwuified_words = [uwuify_text(word) for word in words]
|
||||||
|
uwuified_text = ' '.join(uwuified_words)
|
||||||
|
text.replace_with(uwuified_text)
|
||||||
|
|
||||||
|
for a_tag in soup.find_all('a', href=True):
|
||||||
|
original_href = a_tag['href']
|
||||||
|
if '?' in original_href:
|
||||||
|
new_href = f"{original_href}&uwu=true"
|
||||||
|
else:
|
||||||
|
new_href = f"{original_href}?uwu=true"
|
||||||
|
a_tag['href'] = new_href
|
||||||
|
|
||||||
|
|
||||||
|
return str(soup)
|
||||||
|
|
||||||
|
def is_subdict(sub_dict, main_dict):
|
||||||
|
for key, value in sub_dict.items():
|
||||||
|
if key not in main_dict or main_dict[key] != value:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
patchers: List[Patcher] = [
|
||||||
|
# lambda response, request: Response(
|
||||||
|
# response.code,
|
||||||
|
# response.headers,
|
||||||
|
# "\n".join(line.replace('e', 'a') if index > find_substring_in_lines(response.body.decode('utf-8'), '</head>') else line for index, line in enumerate(response.body.decode('utf-8').splitlines())).encode('utf-8')
|
||||||
|
# ) if 'text/html' in response.headers.values() else response
|
||||||
|
lambda response, request: Response(
|
||||||
|
response.code,
|
||||||
|
response.headers,
|
||||||
|
uwuify(response.body).encode('utf-8')
|
||||||
|
) if 'text/html' in response.headers.values() and is_subdict({'uwu': 'true'}, request.path.params) else response
|
||||||
|
]
|
31
src/lib/path.py
Normal file
31
src/lib/path.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
from urllib.parse import urlsplit, unquote, parse_qs
|
||||||
|
|
||||||
|
class Path:
|
||||||
|
def __init__(self, route: str):
|
||||||
|
self.route = route
|
||||||
|
self.reduce_url()
|
||||||
|
self.get_params()
|
||||||
|
|
||||||
|
def reduce_url(self):
|
||||||
|
_, _, path, _, _ = urlsplit(self.route)
|
||||||
|
|
||||||
|
path = unquote(path)
|
||||||
|
|
||||||
|
segments = []
|
||||||
|
for segment in path.split('/'):
|
||||||
|
if segment != '..':
|
||||||
|
segments.append(segment)
|
||||||
|
elif segments and segments[-1] != '..':
|
||||||
|
segments.pop()
|
||||||
|
|
||||||
|
reduced_path = '/'.join(segments)
|
||||||
|
|
||||||
|
self.path = reduced_path
|
||||||
|
|
||||||
|
def get_params(self):
|
||||||
|
_, _, _, query, _ = urlsplit(self.route)
|
||||||
|
self.params = {key: value[0] if len(value) == 1 else value for key, value in parse_qs(query).items()}
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Path({self.route=}, {self.path=}, {self.params=})"
|
||||||
|
|
59
src/lib/request.py
Normal file
59
src/lib/request.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from .method import Method
|
||||||
|
from .path import Path
|
||||||
|
from .headers import Headers
|
||||||
|
from .body import Body
|
||||||
|
from .router import routes
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Request:
|
||||||
|
method: Method
|
||||||
|
path: Path
|
||||||
|
version: float
|
||||||
|
headers: Headers
|
||||||
|
body: Body
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, request_bytes: bytes):
|
||||||
|
request_str = request_bytes.decode('utf-8')
|
||||||
|
lines = request_str.split('\r\n')
|
||||||
|
|
||||||
|
request_line = lines[0].split()
|
||||||
|
|
||||||
|
if len(request_line) != 3:
|
||||||
|
raise ValueError("Invalid request line")
|
||||||
|
|
||||||
|
method, path, version_str = request_line
|
||||||
|
version = float(version_str.split('/')[1])
|
||||||
|
|
||||||
|
method = Method(method)
|
||||||
|
path = Path(path)
|
||||||
|
|
||||||
|
headers = Headers({})
|
||||||
|
body = b''
|
||||||
|
|
||||||
|
header_lines = lines[1:]
|
||||||
|
for header_line in header_lines:
|
||||||
|
if header_line == '':
|
||||||
|
break
|
||||||
|
key, value = header_line.split(':', 1)
|
||||||
|
headers.add(key.strip(), value.strip())
|
||||||
|
|
||||||
|
body_start = request_str.find('\r\n\r\n') + 4
|
||||||
|
body = Body(request_bytes[body_start:], headers.get('Content-Type') or 'text/plain')
|
||||||
|
|
||||||
|
return cls(method, path, version, headers, body)
|
||||||
|
|
||||||
|
def match(self):
|
||||||
|
for route in routes:
|
||||||
|
if route.matches(self):
|
||||||
|
print(route)
|
||||||
|
return route
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
path_repr = repr(self.path)
|
||||||
|
body_repr = repr(self.body)
|
||||||
|
return (f"Request(method={self.method!r}, path={path_repr}, version={self.version!r}, "
|
||||||
|
f"headers={self.headers!r}, body={body_repr})")
|
||||||
|
|
21
src/lib/response.py
Normal file
21
src/lib/response.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
from socket import socket
|
||||||
|
from typing import Dict
|
||||||
|
from .responsecodes import ResponseCode
|
||||||
|
|
||||||
|
class Response:
|
||||||
|
def __init__(self, code: ResponseCode, headers: Dict[str, str], body: bytes):
|
||||||
|
self.code = code
|
||||||
|
self.headers = headers
|
||||||
|
self.body = body
|
||||||
|
|
||||||
|
def build_response(self) -> bytes:
|
||||||
|
return (
|
||||||
|
f"HTTP/1.1 {str(self.code)}\r\n".encode('utf-8')
|
||||||
|
+ f"{''.join([f"{key}: {value}\r\n" for key, value in self.headers.items()])}\r\n".encode('utf-8')
|
||||||
|
+ self.body
|
||||||
|
)
|
||||||
|
|
||||||
|
def send(self, client: socket) -> None:
|
||||||
|
print(self)
|
||||||
|
client.sendall(self.build_response())
|
||||||
|
client.close()
|
37
src/lib/responsecodes.py
Normal file
37
src/lib/responsecodes.py
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
class ResponseCode(Enum):
|
||||||
|
OK = (200, "OK")
|
||||||
|
CREATED = (201, "Created")
|
||||||
|
ACCEPTED = (202, "Accepted")
|
||||||
|
NO_CONTENT = (204, "No Content")
|
||||||
|
MOVED_PERMANENTLY = (301, "Moved Permanently")
|
||||||
|
FOUND = (302, "Found")
|
||||||
|
BAD_REQUEST = (400, "Bad Request")
|
||||||
|
UNAUTHORIZED = (401, "Unauthorized")
|
||||||
|
FORBIDDEN = (403, "Forbidden")
|
||||||
|
NOT_FOUND = (404, "Not Found")
|
||||||
|
METHOD_NOT_ALLOWED = (405, "Method Not Allowed")
|
||||||
|
INTERNAL_SERVER_ERROR = (500, "Internal Server Error")
|
||||||
|
NOT_IMPLEMENTED = (501, "Not Implemented")
|
||||||
|
SERVICE_UNAVAILABLE = (503, "Service Unavailable")
|
||||||
|
|
||||||
|
code: str
|
||||||
|
message: int
|
||||||
|
|
||||||
|
def __new__(cls, code, message):
|
||||||
|
obj = object.__new__(cls)
|
||||||
|
obj.code = code
|
||||||
|
obj.message = message
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"{self.code} {self.message}"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _missing_(cls, value):
|
||||||
|
for member in cls:
|
||||||
|
if member.code == value:
|
||||||
|
return member
|
||||||
|
return None
|
||||||
|
|
159
src/lib/router.py
Normal file
159
src/lib/router.py
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from socket import socket
|
||||||
|
from datetime import datetime
|
||||||
|
from functools import reduce
|
||||||
|
from typing import List, Callable, Tuple
|
||||||
|
from .method import Method
|
||||||
|
from .response import Response
|
||||||
|
from .responsecodes import ResponseCode
|
||||||
|
from .content import *
|
||||||
|
from .patchers import patchers
|
||||||
|
import os
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Route:
|
||||||
|
matcher: Callable
|
||||||
|
methods: List[Method]
|
||||||
|
handler: Callable[['Request', socket, Tuple[str, int]], Response]
|
||||||
|
|
||||||
|
def method_is_allowed(self, method: Method) -> bool:
|
||||||
|
return method in self.methods
|
||||||
|
|
||||||
|
def execute(self, *args):
|
||||||
|
try:
|
||||||
|
response = self.handler(*args)
|
||||||
|
for patcher in patchers:
|
||||||
|
response = patcher(response, args[0])
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
return error_page(500)
|
||||||
|
|
||||||
|
def matches(self, request: 'Request') -> bool:
|
||||||
|
if not self.method_is_allowed(request.method): return False
|
||||||
|
return self.matcher(request.path)
|
||||||
|
|
||||||
|
routes = [
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/',
|
||||||
|
[Method.GET, Method.POST],
|
||||||
|
lambda request, *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
(parse_file('./home.html', dict(prev='\\/')).encode('utf-8') if request.method == Method.GET else (
|
||||||
|
[
|
||||||
|
(lambda form_data: (
|
||||||
|
(lambda time: (
|
||||||
|
print('\n\nFORM DATA!!!!',form_data,request, '\n\n'),
|
||||||
|
f:=open(f'./files/posts-to-homepage/post_{time}.txt', 'w'),
|
||||||
|
f.write(f"<i style='font-family: MapleMonoItalic'>{form_data['name']}</i>@{time}<br>{form_data['text']}<br><br>"),
|
||||||
|
f.close()
|
||||||
|
))(datetime.now().strftime('%Y-%m-%d_%H:%M:%S-%f')[:-3]) if set(form_data.keys()) == set(['text', 'name']) else None
|
||||||
|
))(
|
||||||
|
reduce(
|
||||||
|
lambda acc, d: acc.update(d) or acc,
|
||||||
|
map(lambda key_value_pair: {key_value_pair[0]: remove_html_tags(key_value_pair[1])}, request.body.data.items()),
|
||||||
|
{}
|
||||||
|
)),
|
||||||
|
parse_file('./home.html').encode('utf-8')
|
||||||
|
][1]
|
||||||
|
))
|
||||||
|
) if len(request.body.data) > 0 or request.method != Method.POST else error_page(ResponseCode.BAD_REQUEST)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda path: os.path.isdir('.' + path.path),
|
||||||
|
[Method.GET],
|
||||||
|
lambda request, *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
parse_file('./dir_index.html', dict(path='.' + request.path.path, prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda path: os.path.isfile('.' + path.path) and path.path.startswith('/html/') and (path.path.endswith('.html') or '/thoughts/' in path.path),
|
||||||
|
[Method.GET],
|
||||||
|
lambda request, *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
parse_file('.' + request.path.path, dict(prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda path: os.path.isfile('.' + path.path) and (path.path.startswith('/font/') or path.path.startswith('/files/')),
|
||||||
|
[Method.GET],
|
||||||
|
lambda request, *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
*raw_file_contents('.' + request.path.path)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/status',
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
parse('<style>$[cat style.css]</style>$[neofetch | ansi2html]').encode('utf-8')
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/stats/is-its-computer-online',
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
page("online-p", """
|
||||||
|
seconds since last heartbeat message (less than 60: online; less than 120: maybe; more than 120: probably not): $[echo $(( $(date +%s) - $(stat -c %Y ./files/stats/heartbeat) ))]
|
||||||
|
""")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/stats/what-song-is-it-listening-to',
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-type': 'text/html'},
|
||||||
|
page("song?", """
|
||||||
|
it is listening to $[cat ./files/stats/song] as of $[echo $(( $(date +%s) - $(stat -c %Y ./files/stats/song) ))] seconds ago.
|
||||||
|
""")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/stats/is-this-server-online',
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-type': 'text/html'},
|
||||||
|
page("server online-p", """
|
||||||
|
I think so.
|
||||||
|
""")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/stats/what-is-its-servers-uptime',
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-type': 'text/html'},
|
||||||
|
page("uptime", """
|
||||||
|
$[uptime]
|
||||||
|
""")
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda request: request.path == '/stats',
|
||||||
|
[Method.GET],
|
||||||
|
lambda request, *_: Response(
|
||||||
|
ResponseCode.OK,
|
||||||
|
{'Content-Type': 'text/html'},
|
||||||
|
parse_file('./html/stats.html', dict(prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
|
||||||
|
)
|
||||||
|
),
|
||||||
|
Route(
|
||||||
|
lambda _: True,
|
||||||
|
[Method.GET],
|
||||||
|
lambda *_: error_page(404)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
17
src/lib/server.py
Normal file
17
src/lib/server.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
import socket
|
||||||
|
import threading
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
def serve(address: str, port: int, callback: Callable, wrapper: Callable[[socket.socket], socket.socket] = lambda s: s) -> None:
|
||||||
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
|
server_socket.bind((address, port))
|
||||||
|
server_socket.listen(1)
|
||||||
|
server_socket = wrapper(server_socket)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
conn, addr = server_socket.accept()
|
||||||
|
client_connection = threading.Thread(target=callback, args=(conn, addr))
|
||||||
|
client_connection.start()
|
||||||
|
|
||||||
|
finally: server_socket.close()
|
38
src/main.py
Normal file
38
src/main.py
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
from lib import Request, serve
|
||||||
|
from typing import Tuple
|
||||||
|
import threading
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
|
import os
|
||||||
|
|
||||||
|
os.chdir('..')
|
||||||
|
|
||||||
|
def handle_client(client: socket.socket, addr: Tuple[str, int]) -> None:
|
||||||
|
request = bytes()
|
||||||
|
|
||||||
|
while (data := client.recv(1024)):
|
||||||
|
request += data
|
||||||
|
print(len(data), data)
|
||||||
|
|
||||||
|
if len(data) < 1024: break
|
||||||
|
|
||||||
|
(request:=Request.from_bytes(request)) \
|
||||||
|
.match() \
|
||||||
|
.execute(request, client, addr) \
|
||||||
|
.send(client)
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
http_thread = threading.Thread(name='http', target=serve, args=('0.0.0.0', 6000, handle_client))
|
||||||
|
https_thread = threading.Thread(name='https', target=serve, args=('0.0.0.0', 6001, handle_client), kwargs=dict(wrapper=lambda socket: [
|
||||||
|
ctx:=ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER),
|
||||||
|
ctx.load_cert_chain(certfile='./badcert.pem', keyfile='./badkey.pem'),
|
||||||
|
ctx.wrap_socket(socket, server_side=True)
|
||||||
|
][-1]
|
||||||
|
))
|
||||||
|
|
||||||
|
http_thread.start()
|
||||||
|
https_thread.start()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user