8 Commits

10 changed files with 53 additions and 118 deletions

4
.gitignore vendored
View File

@ -1,5 +1,7 @@
config.yaml config.yaml
log log
cert.pem
key.pem
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View File

@ -1,4 +1,4 @@
# sludge: webthing for natalieee.net # sludge: webthing for ~~natalieee.net~~ puppygirl.systems
it rhymes with kludge. it rhymes with kludge.
## config ## config

View File

@ -11,7 +11,6 @@ pkgs.mkShell {
pkgs.python312Packages.pygraphviz pkgs.python312Packages.pygraphviz
pkgs.python312Packages.requests_toolbelt pkgs.python312Packages.requests_toolbelt
pkgs.python312Packages.pyaml pkgs.python312Packages.pyaml
pkgs.python312Packages.pillow
]; ];
shellHook = '' shellHook = ''
python src/main.py python src/main.py

View File

@ -8,11 +8,11 @@ class Headers:
def has(self, key: str) -> bool: def has(self, key: str) -> bool:
return key in self.headers.keys() return key in self.headers.keys()
def get(self, key: str) -> str | None: def get(self, key: str) -> str:
if self.has(key): if self.has(key):
return self.headers[key] return self.headers[key]
return None return ''
def add(self, key, value) -> None: def add(self, key, value) -> None:
self.headers[key] = value self.headers[key] = value

View File

@ -18,5 +18,5 @@ stream_logger.setFormatter(formatter)
log.addHandler(file_logger) log.addHandler(file_logger)
log.addHandler(stream_logger) log.addHandler(stream_logger)
log.info('log initialized') if not __name__ == 'sludge.src.lib.logger' else ... log.info('log initialized')

View File

@ -2,7 +2,8 @@ from .response import Response
from typing import Callable, List from typing import Callable, List
type Patcher = Callable[[Response, 'Request'], Response] type Patcher = Callable
patchers: List[Patcher] = [ patchers: List[Patcher] = [
lambda response, request: response
] ]

View File

@ -44,7 +44,9 @@ class Request:
body_start = request_str.find('\r\n\r\n') + 4 body_start = request_str.find('\r\n\r\n') + 4
body = Body(request_bytes[body_start:], headers.get('Content-Type') or 'text/plain') body = Body(request_bytes[body_start:], headers.get('Content-Type') or 'text/plain')
log.info(f'received request for {path.path} from {headers.get('X-Real-IP')}') if not 'Nim httpclient' in headers.get('user-agent'):
log.info(f'received request for {path.path} from {headers.get('X-Real-IP')}')
return cls(method, path, version, headers, body) return cls(method, path, version, headers, body)
def match(self): def match(self):

View File

@ -4,7 +4,7 @@ from .responsecodes import ResponseCode
from .logger import log from .logger import log
class Response: class Response:
def __init__(self, code: ResponseCode, headers: Dict[str, str] = dict(), body: bytes = b''): def __init__(self, code: ResponseCode, headers: Dict[str, str], body: bytes):
self.code = code self.code = code
self.headers = headers self.headers = headers
self.body = body self.body = body

View File

@ -11,12 +11,7 @@ from .patchers import patchers
from .logger import log from .logger import log
import os import os
import traceback import traceback
import mimetypes import requests, json
import hashlib
import base64
with open('files/secrets', 'r') as f:
allowed_secrets = f.read().split('\n')
@dataclass @dataclass
class Route: class Route:
@ -43,125 +38,61 @@ class Route:
if not self.method_is_allowed(request.method): return False if not self.method_is_allowed(request.method): return False
return self.matcher(request.path) return self.matcher(request.path)
def generate_opengraph_html(file_url):
mime_type, _ = mimetypes.guess_type(file_url)
file_name = os.path.basename(file_url)
og_meta = ''
twitter_meta = ''
if mime_type and mime_type.startswith('image/'):
content_type = 'image'
embed_html = f'<img src="{file_url}" alt="{file_name}" style="max-width: 100%; height: auto;">'
og_meta = f'''
<meta property="og:image" content="{file_url}" />
<meta property="og:url" content="{file_url}" />
<meta property="og:image:url" content="{file_url}" />
<meta property="og:image:width" content="500" />
'''
elif mime_type and mime_type.startswith('video/'):
content_type = 'video'
embed_html = f'<video controls style="max-width: 100%;"><source src="{file_url}" type="{mime_type}">Your browser does not support the video tag.</video>'
og_meta = f"""
<meta property="og:video" content="{file_url}" />
<meta property="og:video:url" content="{file_url}" />
<meta property="og:video:secure_url" content="{file_url}" />
<meta property="og:video:type" content="{mime_type}" />
<meta property="og:video:width" content="406" />
<meta property="og:video:height" content="720" />
"""
twitter_meta = f"""
<meta name="twitter:card" content="player" />
<meta name="twitter:title" content="{file_name}" />
<meta name="twitter:player" content="{file_url}" />
<meta name="twitter:player:width" content="406" />
<meta name="twitter:player:height" content="720" />
"""
else:
content_type = 'document'
embed_html = f'<iframe src="{file_url}" title="{file_name}" style="width: 100%; height: 600px; border: none;"></iframe>'
og_meta = ''
html_content = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Embed File: {file_name}</title>
<meta property="og:title" content="{file_name}" />
<meta property="og:type" content="{content_type}" />
<meta property="og:url" content="{file_url}" />
{og_meta}
{twitter_meta}
</head>
<body>
<p>{file_name}</p>
<hr>
{embed_html}
</body>
</html>
"""
return html_content
def is_subdict(sub_dict, main_dict):
for key, value in sub_dict.items():
if key not in main_dict or main_dict[key] != value:
return False
return True
def compute_md5(file_path):
md5_hash = hashlib.md5()
with open(file_path, 'rb') as file:
for chunk in iter(lambda: file.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
routes = [ routes = [
Route( Route(
lambda request: request.path == '/', lambda request: request.path == '/',
[Method.GET], [Method.GET],
lambda *_: Response( lambda request, *_: Response(
ResponseCode.OK, ResponseCode.OK,
{'Content-Type': 'text/html'}, {'Content-Type': 'text/html'},
parse_file('index.html').encode('utf-8') parse_file('./index.html').encode('utf-8')
) )
), ),
Route( Route(
lambda request: [print(os.getcwd(), '.'+request.path, request.params, os.path.isfile('.'+request.path)), os.path.isfile('.'+request.path) and is_subdict({'embed': 'true'}, request.params)][-1], lambda request: request.path == '/bracelet.html',
[Method.GET], [Method.POST],
lambda request, *_: Response( lambda request, *_: Response(
ResponseCode.OK, ResponseCode.OK,
{'Content-Type': 'text/html'}, {'Content-Type': 'text/html'},
generate_opengraph_html(f'https://files.natalieee.net{request.path.path}?hash={request.path.params['hash']}').encode('utf-8') (
) requests.post(
"https://api.pavlok.com/api/v5/stimulus/send",
headers = { # who cares about a pavlok api key
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6bnVsbCwiaWQiOjMxNDA3NiwiZW1haWwiOiJvZGV0dGVhbWF0bzA3QGdtYWlsLmNvbSIsImlzX2FwcGxpY2F0aW9uIjpmYWxzZSwiZXhwIjoxNzYxMzcyMzIwLCJzdWIiOiJhY2Nlc3MifQ.8dyKDR6opM8dpzXfnXpIO1VJApx880mN6fCrmVUShxc',
'Content-Type': 'application/json'
},
data = json.dumps({
"stimulus": {
"stimulusType": request.body.data['type'],
"stimulusValue": request.body.data['strength'],
"reason": request.body.data['message'],
}
})
),
parse_file('./bracelet.html').encode('utf-8')
)[-1]
)
), ),
Route( Route(
lambda request: [print(os.getcwd(), '.'+request.path, request.params, os.path.isfile('.'+request.path)), os.path.isfile('.'+request.path)][-1], lambda request: os.path.isfile('.' + request.path) and request.path.endswith('.html'),
[Method.GET], [Method.GET],
lambda request, *_: Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
parse_file('.' + request.path.path).encode('utf-8')
)
),
Route(
lambda request: os.path.isfile('.' + request.path),
[Method.GET],
lambda request, *_: Response( lambda request, *_: Response(
ResponseCode.OK, ResponseCode.OK,
*raw_file_contents('.'+request.path.path) *raw_file_contents('.' + request.path.path)
) if request.path.params['hash'] == compute_md5('.'+request.path.path) else error_page(403) )
), ),
Route( Route(
lambda request: request.path == '/post', lambda _: True,
[Method.POST],
lambda request, *_: [print(request), Response(
ResponseCode.OK,
{'Content-Type': 'text/html'},
(lambda f: [f.write(base64.b64decode(request.body.content)), f.close(), f'https://files.natalieee.net/{request.path.params['filename']}?hash={compute_md5(request.path.params['filename'])}'][-1])(open(request.path.params['filename'], 'wb')).encode('utf-8')
) if request.path.params['secret'] in allowed_secrets else Response(*error_page(403))][-1]
),
Route(
lambda request: True,
[Method.GET], [Method.GET],
lambda *_: Response(*error_page(404)) lambda *_: error_page(404)
) )
] ]

View File

@ -25,7 +25,7 @@ def handle_client(client: socket.socket, addr: Tuple[str, int]) -> None:
.execute(request, client, addr) \ .execute(request, client, addr) \
.send(client) .send(client)
log.info('destroy thread') log.debug('destroy thread')
def main() -> None: def main() -> None:
http_thread = threading.Thread(name='http', target=serve, args=('0.0.0.0', config['http-port'], handle_client)) http_thread = threading.Thread(name='http', target=serve, args=('0.0.0.0', config['http-port'], handle_client))