1
0
forked from nat/sludge

seperate sludge from site repository

This commit is contained in:
gnat 2024-08-09 18:33:30 -07:00
parent dcb3bc4b27
commit e39b59853f
15 changed files with 624 additions and 2 deletions

1
.gitignore vendored
View File

@ -15,7 +15,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/

View File

@ -1,2 +1,3 @@
# sludge
# Sludge: webthing for natalieee.net
it rhymes with kludge.

9
src/lib/__init__.py Normal file
View File

@ -0,0 +1,9 @@
from .method import Method
from .path import Path
from .headers import Headers
from .body import Body
from .request import Request
from .response import Response
from .responsecodes import ResponseCode
from .server import serve
from .content import *

40
src/lib/body.py Normal file
View File

@ -0,0 +1,40 @@
from typing import Dict, Any
import json
from urllib.parse import parse_qs
from requests_toolbelt.multipart import decoder
class Body:
def __init__(self, content: bytes, content_type: str):
self.content = content
self.content_type = content_type
self.data = self.parse_body()
def parse_body(self) -> Dict[str, Any]:
if 'application/x-www-form-urlencoded' in self.content_type:
return self.parse_form_urlencoded()
elif 'application/json' in self.content_type:
return self.parse_json()
elif 'multipart/form-data' in self.content_type:
boundary = self.content_type.split('boundary=')[1]
return self.parse_multipart(boundary)
else:
return {}
def parse_form_urlencoded(self) -> Dict[str, Any]:
return {key: value[0] if len(value) == 1 else value for key, value in parse_qs(self.content.decode('utf-8')).items()}
def parse_json(self) -> Dict[str, Any]:
return json.loads(self.content.decode('utf-8'))
def parse_multipart(self, boundary: str) -> Dict[str, Any]:
multipart_data = decoder.MultipartDecoder(self.content, boundary)
fields = {}
for part in multipart_data.parts:
fields[part.headers['Content-Disposition'].split('=')[1].strip('"')] = part.text
return fields
def __str__(self):
return str(self.data)
def __repr__(self):
return f'Body({self.content=}, {self.content_type=}, {self.data=})'

83
src/lib/content.py Normal file
View File

@ -0,0 +1,83 @@
from typing import Dict, Tuple
import bleach
import mimetypes
import subprocess
import re
import os
from .response import Response
from .responsecodes import ResponseCode
env = os.environ.copy()
env["PATH"] = "./scripts/:" + env["PATH"]
def execute_bash_code(match: re.Match) -> str:
code = match.group(1)
result = subprocess.check_output(code, shell=True, executable='bash', env=env)
return result.decode().strip()
def parse(string: str) -> str:
return re.sub(r'\$\[(.*?)\]', execute_bash_code, string)
def parse_file(filename: str, args: Dict[str, str]={}) -> str:
with open(filename, 'r') as file:
data = file.read()
for k, v in args.items():
data = data.replace('{'+k+'}', str(v))
return parse(data)
def raw_file_contents(file_path: str) -> Tuple[Dict[str, str], bytes]:
mime_type, _ = mimetypes.guess_type('.' + file_path)
if not mime_type:
mime_type = 'text/plain'
with open(file_path, 'rb') as f:
data = f.read()
return {'Content-Type': mime_type}, data
def remove_html_tags(input_string: str) -> str:
cleaned_string = bleach.clean(input_string, tags=[], attributes={})
return cleaned_string
def error_page(code: int) -> Response:
type = ResponseCode(code)
print('error page called')
aoeu= Response(
type,
{'Content-Type': 'text/html'},
parse(f'''
<html>
<head>
<style>$[cat style.css]</style>
</head>
<body>
<h1>{type.code}</h1>
<p>{type.message}</p>
</body>
</html>
''').encode('utf-8')
)
print(aoeu)
return aoeu
def page(title, body):
return parse("""
<html>
<head>
<title>""" + title + """</title>
<style>$[cat style.css]</style>
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
$[include html/header.html]
<main>
""" + body + """
</main>
$[include html/footer.html]
<body>
</html>
""").encode('utf-8')

18
src/lib/headers.py Normal file
View File

@ -0,0 +1,18 @@
from dataclasses import dataclass
from typing import Dict
@dataclass
class Headers:
headers: Dict[str, str]
def has(self, key: str) -> bool:
return key in self.headers.keys()
def get(self, key: str) -> str | None:
if self.has(key):
return self.headers[key]
return None
def add(self, key, value) -> None:
self.headers[key] = value

20
src/lib/method.py Normal file
View File

@ -0,0 +1,20 @@
from enum import Enum
class Method(Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
HEAD = "HEAD"
OPTIONS = "OPTIONS"
method: str
def __new__(cls, method):
obj = object.__new__(cls)
obj.method = method
return obj
def __str__(self):
return self.method

90
src/lib/patchers.py Normal file
View File

@ -0,0 +1,90 @@
from .response import Response
from typing import Callable, List
import re
import random
from bs4 import BeautifulSoup
type Patcher = Callable[[Response, 'Request'], Response]
def find_substring_in_lines(s, substring):
for line_index, line in enumerate(s.splitlines()):
position = line.find(substring)
if position != -1:
return line_index
return 0
def extract_words_from_line(line):
clean_line = re.sub(r'<[^>]+>', '', line)
words = clean_line.split()
return words
def uwuify_text(text):
replacements = [
(r'r', 'w'),
(r'l', 'w'),
(r'R', 'W'),
(r'L', 'W'),
(r'no', 'nyo'),
(r'No', 'Nyo'),
(r'u', 'uwu'),
(r'U', 'Uwu')
]
for pattern, replacement in replacements:
text = re.sub(pattern, replacement, text)
expressions = [" owo", " UwU", " rawr", " >w<"]
sentences = text.split('. ')
uwuified_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if sentence:
uwuified_sentences.append(sentence + (random.choice(expressions) if random.randint(0, 5) > 4 else ''))
return '. '.join(uwuified_sentences)
def uwuify(body):
body = body.decode('utf-8')
soup = BeautifulSoup(body, 'html.parser')
for text in soup.find_all(text=True):
if text.parent.name not in ['script', 'style']:
original_text = text.string
words = extract_words_from_line(original_text)
uwuified_words = [uwuify_text(word) for word in words]
uwuified_text = ' '.join(uwuified_words)
text.replace_with(uwuified_text)
for a_tag in soup.find_all('a', href=True):
original_href = a_tag['href']
if '?' in original_href:
new_href = f"{original_href}&uwu=true"
else:
new_href = f"{original_href}?uwu=true"
a_tag['href'] = new_href
return str(soup)
def is_subdict(sub_dict, main_dict):
for key, value in sub_dict.items():
if key not in main_dict or main_dict[key] != value:
return False
return True
patchers: List[Patcher] = [
# lambda response, request: Response(
# response.code,
# response.headers,
# "\n".join(line.replace('e', 'a') if index > find_substring_in_lines(response.body.decode('utf-8'), '</head>') else line for index, line in enumerate(response.body.decode('utf-8').splitlines())).encode('utf-8')
# ) if 'text/html' in response.headers.values() else response
lambda response, request: Response(
response.code,
response.headers,
uwuify(response.body).encode('utf-8')
) if 'text/html' in response.headers.values() and is_subdict({'uwu': 'true'}, request.path.params) else response
]

31
src/lib/path.py Normal file
View File

@ -0,0 +1,31 @@
from urllib.parse import urlsplit, unquote, parse_qs
class Path:
def __init__(self, route: str):
self.route = route
self.reduce_url()
self.get_params()
def reduce_url(self):
_, _, path, _, _ = urlsplit(self.route)
path = unquote(path)
segments = []
for segment in path.split('/'):
if segment != '..':
segments.append(segment)
elif segments and segments[-1] != '..':
segments.pop()
reduced_path = '/'.join(segments)
self.path = reduced_path
def get_params(self):
_, _, _, query, _ = urlsplit(self.route)
self.params = {key: value[0] if len(value) == 1 else value for key, value in parse_qs(query).items()}
def __repr__(self):
return f"Path({self.route=}, {self.path=}, {self.params=})"

59
src/lib/request.py Normal file
View File

@ -0,0 +1,59 @@
from dataclasses import dataclass
from .method import Method
from .path import Path
from .headers import Headers
from .body import Body
from .router import routes
@dataclass
class Request:
method: Method
path: Path
version: float
headers: Headers
body: Body
@classmethod
def from_bytes(cls, request_bytes: bytes):
request_str = request_bytes.decode('utf-8')
lines = request_str.split('\r\n')
request_line = lines[0].split()
if len(request_line) != 3:
raise ValueError("Invalid request line")
method, path, version_str = request_line
version = float(version_str.split('/')[1])
method = Method(method)
path = Path(path)
headers = Headers({})
body = b''
header_lines = lines[1:]
for header_line in header_lines:
if header_line == '':
break
key, value = header_line.split(':', 1)
headers.add(key.strip(), value.strip())
body_start = request_str.find('\r\n\r\n') + 4
body = Body(request_bytes[body_start:], headers.get('Content-Type') or 'text/plain')
return cls(method, path, version, headers, body)
def match(self):
for route in routes:
if route.matches(self):
print(route)
return route
def __repr__(self):
path_repr = repr(self.path)
body_repr = repr(self.body)
return (f"Request(method={self.method!r}, path={path_repr}, version={self.version!r}, "
f"headers={self.headers!r}, body={body_repr})")

21
src/lib/response.py Normal file
View File

@ -0,0 +1,21 @@
from socket import socket
from typing import Dict
from .responsecodes import ResponseCode
class Response:
def __init__(self, code: ResponseCode, headers: Dict[str, str], body: bytes):
self.code = code
self.headers = headers
self.body = body
def build_response(self) -> bytes:
return (
f"HTTP/1.1 {str(self.code)}\r\n".encode('utf-8')
+ f"{''.join([f"{key}: {value}\r\n" for key, value in self.headers.items()])}\r\n".encode('utf-8')
+ self.body
)
def send(self, client: socket) -> None:
print(self)
client.sendall(self.build_response())
client.close()

37
src/lib/responsecodes.py Normal file
View File

@ -0,0 +1,37 @@
from enum import Enum
class ResponseCode(Enum):
OK = (200, "OK")
CREATED = (201, "Created")
ACCEPTED = (202, "Accepted")
NO_CONTENT = (204, "No Content")
MOVED_PERMANENTLY = (301, "Moved Permanently")
FOUND = (302, "Found")
BAD_REQUEST = (400, "Bad Request")
UNAUTHORIZED = (401, "Unauthorized")
FORBIDDEN = (403, "Forbidden")
NOT_FOUND = (404, "Not Found")
METHOD_NOT_ALLOWED = (405, "Method Not Allowed")
INTERNAL_SERVER_ERROR = (500, "Internal Server Error")
NOT_IMPLEMENTED = (501, "Not Implemented")
SERVICE_UNAVAILABLE = (503, "Service Unavailable")
code: str
message: int
def __new__(cls, code, message):
obj = object.__new__(cls)
obj.code = code
obj.message = message
return obj
def __str__(self):
return f"{self.code} {self.message}"
@classmethod
def _missing_(cls, value):
for member in cls:
if member.code == value:
return member
return None

159
src/lib/router.py Normal file
View File

@ -0,0 +1,159 @@
from dataclasses import dataclass
from socket import socket
from datetime import datetime
from functools import reduce
from typing import List, Callable, Tuple
from .method import Method
from .response import Response
from .responsecodes import ResponseCode
from .content import *
from .patchers import patchers
import os
@dataclass
class Route:
matcher: Callable
methods: List[Method]
handler: Callable[['Request', socket, Tuple[str, int]], Response]
def method_is_allowed(self, method: Method) -> bool:
return method in self.methods
def execute(self, *args):
try:
response = self.handler(*args)
for patcher in patchers:
response = patcher(response, args[0])
return response
except Exception as e:
print(e)
return error_page(500)
def matches(self, request: 'Request') -> bool:
if not self.method_is_allowed(request.method): return False
return self.matcher(request.path)
routes = [
Route(
lambda request: request.path == '/',
[Method.GET, Method.POST],
lambda request, *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
(parse_file('./home.html', dict(prev='\\/')).encode('utf-8') if request.method == Method.GET else (
[
(lambda form_data: (
(lambda time: (
print('\n\nFORM DATA!!!!',form_data,request, '\n\n'),
f:=open(f'./files/posts-to-homepage/post_{time}.txt', 'w'),
f.write(f"<i style='font-family: MapleMonoItalic'>{form_data['name']}</i>@{time}<br>{form_data['text']}<br><br>"),
f.close()
))(datetime.now().strftime('%Y-%m-%d_%H:%M:%S-%f')[:-3]) if set(form_data.keys()) == set(['text', 'name']) else None
))(
reduce(
lambda acc, d: acc.update(d) or acc,
map(lambda key_value_pair: {key_value_pair[0]: remove_html_tags(key_value_pair[1])}, request.body.data.items()),
{}
)),
parse_file('./home.html').encode('utf-8')
][1]
))
) if len(request.body.data) > 0 or request.method != Method.POST else error_page(ResponseCode.BAD_REQUEST)
),
Route(
lambda path: os.path.isdir('.' + path.path),
[Method.GET],
lambda request, *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
parse_file('./dir_index.html', dict(path='.' + request.path.path, prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
)
),
Route(
lambda path: os.path.isfile('.' + path.path) and path.path.startswith('/html/') and (path.path.endswith('.html') or '/thoughts/' in path.path),
[Method.GET],
lambda request, *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
parse_file('.' + request.path.path, dict(prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
)
),
Route(
lambda path: os.path.isfile('.' + path.path) and (path.path.startswith('/font/') or path.path.startswith('/files/')),
[Method.GET],
lambda request, *_: Response(
ResponseCode.OK,
*raw_file_contents('.' + request.path.path)
)
),
Route(
lambda request: request.path == '/status',
[Method.GET],
lambda *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
parse('<style>$[cat style.css]</style>$[neofetch | ansi2html]').encode('utf-8')
)
),
Route(
lambda request: request.path == '/stats/is-its-computer-online',
[Method.GET],
lambda *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
page("online-p", """
seconds since last heartbeat message (less than 60: online; less than 120: maybe; more than 120: probably not): $[echo $(( $(date +%s) - $(stat -c %Y ./files/stats/heartbeat) ))]
""")
)
),
Route(
lambda request: request.path == '/stats/what-song-is-it-listening-to',
[Method.GET],
lambda *_: Response(
ResponseCode.OK,
{'Content-type': 'text/html'},
page("song?", """
it is listening to $[cat ./files/stats/song] as of $[echo $(( $(date +%s) - $(stat -c %Y ./files/stats/song) ))] seconds ago.
""")
)
),
Route(
lambda request: request.path == '/stats/is-this-server-online',
[Method.GET],
lambda *_: Response(
ResponseCode.OK,
{'Content-type': 'text/html'},
page("server online-p", """
I think so.
""")
)
),
Route(
lambda request: request.path == '/stats/what-is-its-servers-uptime',
[Method.GET],
lambda *_: Response(
ResponseCode.OK,
{'Content-type': 'text/html'},
page("uptime", """
$[uptime]
""")
)
),
Route(
lambda request: request.path == '/stats',
[Method.GET],
lambda request, *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
parse_file('./html/stats.html', dict(prev=request.headers.get('Referer').replace('/', '\\/') if request.headers.has('Referer') else '')).encode('utf-8')
)
),
Route(
lambda _: True,
[Method.GET],
lambda *_: error_page(404)
)
]

17
src/lib/server.py Normal file
View File

@ -0,0 +1,17 @@
import socket
import threading
from typing import Callable
def serve(address: str, port: int, callback: Callable, wrapper: Callable[[socket.socket], socket.socket] = lambda s: s) -> None:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
server_socket.bind((address, port))
server_socket.listen(1)
server_socket = wrapper(server_socket)
while True:
conn, addr = server_socket.accept()
client_connection = threading.Thread(target=callback, args=(conn, addr))
client_connection.start()
finally: server_socket.close()

38
src/main.py Normal file
View File

@ -0,0 +1,38 @@
from lib import Request, serve
from typing import Tuple
import threading
import socket
import ssl
import os
os.chdir('site')
def handle_client(client: socket.socket, addr: Tuple[str, int]) -> None:
request = bytes()
while (data := client.recv(1024)):
request += data
print(len(data), data)
if len(data) < 1024: break
(request:=Request.from_bytes(request)) \
.match() \
.execute(request, client, addr) \
.send(client)
def main() -> None:
http_thread = threading.Thread(name='http', target=serve, args=('0.0.0.0', 5000, handle_client))
https_thread = threading.Thread(name='https', target=serve, args=('0.0.0.0', 6001, handle_client), kwargs=dict(wrapper=lambda socket: [
ctx:=ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER),
ctx.load_cert_chain(certfile='./badcert.pem', keyfile='./badkey.pem'),
ctx.wrap_socket(socket, server_side=True)
][-1]
))
http_thread.start()
#https_thread.start()
if __name__ == '__main__':
main()