Compare commits
8 Commits
files.nata
...
main
Author | SHA1 | Date | |
---|---|---|---|
80212a5c03 | |||
bce0d97e23 | |||
cf2fa1f0c0 | |||
2661e44e78 | |||
33807314f4 | |||
81429ec7cb | |||
e8d333e399 | |||
2793325c50 |
2
.gitignore
vendored
2
.gitignore
vendored
@ -1,5 +1,7 @@
|
||||
config.yaml
|
||||
log
|
||||
cert.pem
|
||||
key.pem
|
||||
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
|
@ -1,4 +1,4 @@
|
||||
# sludge: webthing for natalieee.net
|
||||
# sludge: webthing for ~~natalieee.net~~ puppygirl.systems
|
||||
it rhymes with kludge.
|
||||
|
||||
## config
|
||||
|
@ -11,7 +11,6 @@ pkgs.mkShell {
|
||||
pkgs.python312Packages.pygraphviz
|
||||
pkgs.python312Packages.requests_toolbelt
|
||||
pkgs.python312Packages.pyaml
|
||||
pkgs.python312Packages.pillow
|
||||
];
|
||||
shellHook = ''
|
||||
python src/main.py
|
||||
|
@ -8,11 +8,11 @@ class Headers:
|
||||
def has(self, key: str) -> bool:
|
||||
return key in self.headers.keys()
|
||||
|
||||
def get(self, key: str) -> str | None:
|
||||
def get(self, key: str) -> str:
|
||||
if self.has(key):
|
||||
return self.headers[key]
|
||||
|
||||
return None
|
||||
return ''
|
||||
|
||||
def add(self, key, value) -> None:
|
||||
self.headers[key] = value
|
||||
|
@ -18,5 +18,5 @@ stream_logger.setFormatter(formatter)
|
||||
log.addHandler(file_logger)
|
||||
log.addHandler(stream_logger)
|
||||
|
||||
log.info('log initialized') if not __name__ == 'sludge.src.lib.logger' else ...
|
||||
log.info('log initialized')
|
||||
|
||||
|
@ -2,7 +2,8 @@ from .response import Response
|
||||
|
||||
from typing import Callable, List
|
||||
|
||||
type Patcher = Callable[[Response, 'Request'], Response]
|
||||
type Patcher = Callable
|
||||
|
||||
patchers: List[Patcher] = [
|
||||
lambda response, request: response
|
||||
]
|
||||
|
@ -44,7 +44,9 @@ class Request:
|
||||
body_start = request_str.find('\r\n\r\n') + 4
|
||||
body = Body(request_bytes[body_start:], headers.get('Content-Type') or 'text/plain')
|
||||
|
||||
log.info(f'received request for {path.path} from {headers.get('X-Real-IP')}')
|
||||
if not 'Nim httpclient' in headers.get('user-agent'):
|
||||
log.info(f'received request for {path.path} from {headers.get('X-Real-IP')}')
|
||||
|
||||
return cls(method, path, version, headers, body)
|
||||
|
||||
def match(self):
|
||||
|
@ -4,7 +4,7 @@ from .responsecodes import ResponseCode
|
||||
from .logger import log
|
||||
|
||||
class Response:
|
||||
def __init__(self, code: ResponseCode, headers: Dict[str, str] = dict(), body: bytes = b''):
|
||||
def __init__(self, code: ResponseCode, headers: Dict[str, str], body: bytes):
|
||||
self.code = code
|
||||
self.headers = headers
|
||||
self.body = body
|
||||
|
@ -11,12 +11,7 @@ from .patchers import patchers
|
||||
from .logger import log
|
||||
import os
|
||||
import traceback
|
||||
import mimetypes
|
||||
import hashlib
|
||||
import base64
|
||||
|
||||
with open('files/secrets', 'r') as f:
|
||||
allowed_secrets = f.read().split('\n')
|
||||
import requests, json
|
||||
|
||||
@dataclass
|
||||
class Route:
|
||||
@ -43,125 +38,61 @@ class Route:
|
||||
if not self.method_is_allowed(request.method): return False
|
||||
return self.matcher(request.path)
|
||||
|
||||
def generate_opengraph_html(file_url):
|
||||
mime_type, _ = mimetypes.guess_type(file_url)
|
||||
file_name = os.path.basename(file_url)
|
||||
|
||||
og_meta = ''
|
||||
twitter_meta = ''
|
||||
|
||||
if mime_type and mime_type.startswith('image/'):
|
||||
content_type = 'image'
|
||||
embed_html = f'<img src="{file_url}" alt="{file_name}" style="max-width: 100%; height: auto;">'
|
||||
og_meta = f'''
|
||||
<meta property="og:image" content="{file_url}" />
|
||||
<meta property="og:url" content="{file_url}" />
|
||||
<meta property="og:image:url" content="{file_url}" />
|
||||
<meta property="og:image:width" content="500" />
|
||||
'''
|
||||
elif mime_type and mime_type.startswith('video/'):
|
||||
content_type = 'video'
|
||||
embed_html = f'<video controls style="max-width: 100%;"><source src="{file_url}" type="{mime_type}">Your browser does not support the video tag.</video>'
|
||||
|
||||
og_meta = f"""
|
||||
<meta property="og:video" content="{file_url}" />
|
||||
<meta property="og:video:url" content="{file_url}" />
|
||||
<meta property="og:video:secure_url" content="{file_url}" />
|
||||
<meta property="og:video:type" content="{mime_type}" />
|
||||
<meta property="og:video:width" content="406" />
|
||||
<meta property="og:video:height" content="720" />
|
||||
"""
|
||||
|
||||
twitter_meta = f"""
|
||||
<meta name="twitter:card" content="player" />
|
||||
<meta name="twitter:title" content="{file_name}" />
|
||||
<meta name="twitter:player" content="{file_url}" />
|
||||
<meta name="twitter:player:width" content="406" />
|
||||
<meta name="twitter:player:height" content="720" />
|
||||
"""
|
||||
else:
|
||||
content_type = 'document'
|
||||
embed_html = f'<iframe src="{file_url}" title="{file_name}" style="width: 100%; height: 600px; border: none;"></iframe>'
|
||||
og_meta = ''
|
||||
|
||||
html_content = f"""
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Embed File: {file_name}</title>
|
||||
|
||||
<meta property="og:title" content="{file_name}" />
|
||||
<meta property="og:type" content="{content_type}" />
|
||||
<meta property="og:url" content="{file_url}" />
|
||||
{og_meta}
|
||||
{twitter_meta}
|
||||
</head>
|
||||
<body>
|
||||
<p>{file_name}</p>
|
||||
<hr>
|
||||
{embed_html}
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
return html_content
|
||||
|
||||
def is_subdict(sub_dict, main_dict):
|
||||
for key, value in sub_dict.items():
|
||||
if key not in main_dict or main_dict[key] != value:
|
||||
return False
|
||||
return True
|
||||
|
||||
def compute_md5(file_path):
|
||||
md5_hash = hashlib.md5()
|
||||
|
||||
with open(file_path, 'rb') as file:
|
||||
for chunk in iter(lambda: file.read(4096), b""):
|
||||
md5_hash.update(chunk)
|
||||
|
||||
return md5_hash.hexdigest()
|
||||
|
||||
routes = [
|
||||
Route(
|
||||
lambda request: request.path == '/',
|
||||
[Method.GET],
|
||||
lambda *_: Response(
|
||||
ResponseCode.OK,
|
||||
{'Content-Type': 'text/html'},
|
||||
parse_file('index.html').encode('utf-8')
|
||||
)
|
||||
),
|
||||
Route(
|
||||
lambda request: [print(os.getcwd(), '.'+request.path, request.params, os.path.isfile('.'+request.path)), os.path.isfile('.'+request.path) and is_subdict({'embed': 'true'}, request.params)][-1],
|
||||
[Method.GET],
|
||||
lambda request, *_: Response(
|
||||
ResponseCode.OK,
|
||||
{'Content-Type': 'text/html'},
|
||||
generate_opengraph_html(f'https://files.natalieee.net{request.path.path}?hash={request.path.params['hash']}').encode('utf-8')
|
||||
parse_file('./index.html').encode('utf-8')
|
||||
)
|
||||
),
|
||||
Route(
|
||||
lambda request: [print(os.getcwd(), '.'+request.path, request.params, os.path.isfile('.'+request.path)), os.path.isfile('.'+request.path)][-1],
|
||||
[Method.GET],
|
||||
lambda request, *_: Response(
|
||||
ResponseCode.OK,
|
||||
*raw_file_contents('.'+request.path.path)
|
||||
) if request.path.params['hash'] == compute_md5('.'+request.path.path) else error_page(403)
|
||||
),
|
||||
Route(
|
||||
lambda request: request.path == '/post',
|
||||
lambda request: request.path == '/bracelet.html',
|
||||
[Method.POST],
|
||||
lambda request, *_: [print(request), Response(
|
||||
lambda request, *_: Response(
|
||||
ResponseCode.OK,
|
||||
{'Content-Type': 'text/html'},
|
||||
(lambda f: [f.write(base64.b64decode(request.body.content)), f.close(), f'https://files.natalieee.net/{request.path.params['filename']}?hash={compute_md5(request.path.params['filename'])}'][-1])(open(request.path.params['filename'], 'wb')).encode('utf-8')
|
||||
) if request.path.params['secret'] in allowed_secrets else Response(*error_page(403))][-1]
|
||||
(
|
||||
requests.post(
|
||||
"https://api.pavlok.com/api/v5/stimulus/send",
|
||||
headers = { # who cares about a pavlok api key
|
||||
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6bnVsbCwiaWQiOjMxNDA3NiwiZW1haWwiOiJvZGV0dGVhbWF0bzA3QGdtYWlsLmNvbSIsImlzX2FwcGxpY2F0aW9uIjpmYWxzZSwiZXhwIjoxNzYxMzcyMzIwLCJzdWIiOiJhY2Nlc3MifQ.8dyKDR6opM8dpzXfnXpIO1VJApx880mN6fCrmVUShxc',
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
data = json.dumps({
|
||||
"stimulus": {
|
||||
"stimulusType": request.body.data['type'],
|
||||
"stimulusValue": request.body.data['strength'],
|
||||
"reason": request.body.data['message'],
|
||||
}
|
||||
})
|
||||
),
|
||||
parse_file('./bracelet.html').encode('utf-8')
|
||||
)[-1]
|
||||
)
|
||||
),
|
||||
Route(
|
||||
lambda request: True,
|
||||
lambda request: os.path.isfile('.' + request.path) and request.path.endswith('.html'),
|
||||
[Method.GET],
|
||||
lambda *_: Response(*error_page(404))
|
||||
lambda request, *_: Response(
|
||||
ResponseCode.OK,
|
||||
{'Content-Type': 'text/html'},
|
||||
parse_file('.' + request.path.path).encode('utf-8')
|
||||
)
|
||||
),
|
||||
Route(
|
||||
lambda request: os.path.isfile('.' + request.path),
|
||||
[Method.GET],
|
||||
lambda request, *_: Response(
|
||||
ResponseCode.OK,
|
||||
*raw_file_contents('.' + request.path.path)
|
||||
)
|
||||
),
|
||||
Route(
|
||||
lambda _: True,
|
||||
[Method.GET],
|
||||
lambda *_: error_page(404)
|
||||
)
|
||||
]
|
||||
|
||||
|
@ -25,7 +25,7 @@ def handle_client(client: socket.socket, addr: Tuple[str, int]) -> None:
|
||||
.execute(request, client, addr) \
|
||||
.send(client)
|
||||
|
||||
log.info('destroy thread')
|
||||
log.debug('destroy thread')
|
||||
|
||||
def main() -> None:
|
||||
http_thread = threading.Thread(name='http', target=serve, args=('0.0.0.0', config['http-port'], handle_client))
|
||||
|
Reference in New Issue
Block a user