rewrite http server to use asyncio
This commit is contained in:
@ -1,19 +1,38 @@
|
||||
(import bleach [clean])
|
||||
(import mimetypes [guess-type])
|
||||
(import subprocess [check-output])
|
||||
(import re [sub])
|
||||
(import re)
|
||||
(import os [environ :as hy-env])
|
||||
(import asyncio)
|
||||
(import aiofiles)
|
||||
|
||||
(setv (get hy-env "PATH") (+ (get hy-env "PATH") ":./www/site/scripts"))
|
||||
|
||||
(defn execute-bash [data]
|
||||
(sub r"\$\[(.*?)\]" (fn [sequence]
|
||||
(. (check-output (.group sequence 1) :shell True :executable "/bin/bash" :env hy-env) (decode) (strip)))
|
||||
data))
|
||||
(defn :async execute-bash [data]
|
||||
|
||||
(defn parse-html-file [path [no-exec False] #** kwargs]
|
||||
(with [f (open path "r")]
|
||||
(setv data (.read f)))
|
||||
(defn :async process-match [match]
|
||||
(let [command (.group match 1)
|
||||
process (await (asyncio.create-subprocess-shell
|
||||
command
|
||||
:stdout asyncio.subprocess.PIPE
|
||||
:stderr asyncio.subprocess.PIPE
|
||||
:shell True
|
||||
:executable "/bin/bash"
|
||||
:env hy-env))]
|
||||
(let [[stdout stderr] (await (.communicate process))]
|
||||
(.decode stdout :errors "ignore"))))
|
||||
|
||||
(setv matches (list (re.finditer r"\$\[(.*?)\]" data)))
|
||||
(setv replacements (await (asyncio.gather #* (lfor match matches (process-match match)))))
|
||||
|
||||
(setv result (list data))
|
||||
(for [match (reversed matches)]
|
||||
(setv (cut result (match.start) (match.end)) (get replacements (.index matches match))))
|
||||
(.join "" result))
|
||||
|
||||
(defn :async parse-html-file [path [no-exec False] #** kwargs]
|
||||
(with [:async f (aiofiles.open path "r")]
|
||||
(setv data (await (.read f))))
|
||||
|
||||
(for [[k v] (.items kwargs)]
|
||||
(setv data (.replace data f"{"{"}{k}{"}"}" (str v))))
|
||||
@ -21,15 +40,15 @@
|
||||
(when no-exec
|
||||
(return data))
|
||||
|
||||
(execute-bash data))
|
||||
(await (execute-bash data)))
|
||||
|
||||
(defn send-raw-file [path]
|
||||
(defn :async send-raw-file [path]
|
||||
(setv [mime-type _] (guess-type path))
|
||||
|
||||
(when (not mime-type)
|
||||
(setv mime-type "text/plain"))
|
||||
|
||||
(with [f (open path "rb")]
|
||||
(setv data (.read f)))
|
||||
(with [:async f (aiofiles.open path "rb")]
|
||||
(setv data (await (.read f))))
|
||||
|
||||
(return #({"Content-Type" mime-type "Cache-Control" "max-age=300, stale-while-revalidate=3600"} data)))
|
||||
|
@ -8,11 +8,12 @@
|
||||
(import re)
|
||||
(import functools [lru-cache])
|
||||
(import os.path [isdir :as dir? isfile :as file? abspath])
|
||||
(import asyncio)
|
||||
|
||||
(defn error [code message]
|
||||
(defn :async error [code message]
|
||||
(return (dict
|
||||
:code code
|
||||
:body (parse-html-file "./www/site/html/error.html" :code code :message message))))
|
||||
:body (await (parse-html-file "./www/site/html/error.html" :code code :message message)))))
|
||||
|
||||
(defclass always []
|
||||
(meth __init__ [@value])
|
||||
@ -66,6 +67,10 @@
|
||||
`[method path request ~@(.parse-one-form &reader)]
|
||||
'[method path request]))
|
||||
|
||||
(defreader await
|
||||
(.slurp-space &reader)
|
||||
`(await ~(.parse-one-form &reader)))
|
||||
|
||||
(defn if-file-exists [* base-path otherwise]
|
||||
(fn [f]
|
||||
(fn [method path request]
|
||||
@ -96,14 +101,14 @@
|
||||
:code 303
|
||||
:headers {"Location" f"http://natalieee.net.8.f.9.e.0.7.4.0.1.0.0.2.ip6.arpa{(. request (get "route") (get "unparsed_route"))}"})))))
|
||||
|
||||
(defn shtml-file-response [file [code 200] [no-exec False] [template-params {}]]
|
||||
(defn :async shtml-file-response [file [code 200] [no-exec False] [template-params {}]]
|
||||
(dict
|
||||
:code code
|
||||
:headers {"Connection" "keep-alive" "Keep-Alive" "timeout=5 max=200"}
|
||||
:body (parse-html-file f"./www/site/html/{file}" :no-exec no-exec #** template-params)))
|
||||
:body #await (parse-html-file f"./www/site/html/{file}" :no-exec no-exec #** template-params)))
|
||||
|
||||
(defn raw-file-response [file [code 200]]
|
||||
(dict :code code #** (dict (zip ["headers" "body"] (send-raw-file f"./www/site/{file}")))))
|
||||
(defn :async raw-file-response [file [code 200]]
|
||||
(dict :code code #** (dict (zip ["headers" "body"] #await (send-raw-file f"./www/site/{file}")))))
|
||||
|
||||
(defn+ match-request [{method "method" {path "path"} "route" :as request}]
|
||||
;; (try
|
||||
@ -111,29 +116,29 @@
|
||||
;; (except [Exception]
|
||||
;; (return (error 500 "server error")))))
|
||||
|
||||
(defn [(route "/" "GET")] /home #route-args (shtml-file-response "home.html"))
|
||||
(defn [(route "/html/*" "GET") (if-file-exists :base-path "./www/site/html" :otherwise (error 404 "not found"))] /html/* #route-args (shtml-file-response path))
|
||||
(defn [lru-cache (route "/assets/*" "GET") (if-file-exists :base-path "./www/site/" :otherwise (error 404 "not found"))] /assets/* #route-args (raw-file-response path))
|
||||
(defn [(route "/html/view-thought.html" "GET") (forward-params "thought" "filter-tag")] /html/view-thought #route-args [#** template-args] (shtml-file-response "/html/view-thought.html" :template-params template-args))
|
||||
(defn [(route "/comment" "POST")] /comments #route-args (create-comment request))
|
||||
(defn [lru-cache (route "/robots.txt" "GET") ] /robots #route-args (dict :code 200 :headers {"Content-Type" "text/plain"} :body "User-agent *\nDisallow: /\n"))
|
||||
(defn :async [(route "/" "GET")] /home #route-args #await (shtml-file-response "home.html"))
|
||||
(defn :async [(route "/html/*" "GET") (if-file-exists :base-path "./www/site/html" :otherwise (error 404 "not found"))] /html/* #route-args #await (shtml-file-response path))
|
||||
(defn :async [lru-cache (route "/assets/*" "GET") (if-file-exists :base-path "./www/site/" :otherwise (error 404 "not found"))] /assets/* #route-args #await (raw-file-response path))
|
||||
(defn :async [(route "/html/view-thought.html" "GET") (forward-params "thought" "filter-tag")] /html/view-thought #route-args [#** template-args] #await (shtml-file-response "/html/view-thought.html" :template-params template-args))
|
||||
(defn :async [(route "/comment" "POST")] /comments #route-args (create-comment request))
|
||||
(defn :async [lru-cache (route "/robots.txt" "GET") ] /robots #route-args (dict :code 200 :headers {"Content-Type" "text/plain"} :body "User-agent *\nDisallow: /\n"))
|
||||
|
||||
;; *.arpa web n-gon
|
||||
(setv members (arpa-n-gon.get-members))
|
||||
|
||||
(defn [(route "/arpa-n-gon" GET) (303-if-not-arpa)] /arpa-n-gon #route-args (shtml-file-response "arpa-n-gon.html" :template-params (dict :n_gon (arpa-n-gon.n-gon-name (len members)) :n_gon_inc (arpa-n-gon.n-gon-name (+ (len members) 1)) :n (len members))))
|
||||
(defn :async [(route "/arpa-n-gon" GET) (303-if-not-arpa)] /arpa-n-gon #route-args #await (shtml-file-response "arpa-n-gon.html" :template-params (dict :n_gon (arpa-n-gon.n-gon-name (len members)) :n_gon_inc (arpa-n-gon.n-gon-name (+ (len members) 1)) :n (len members))))
|
||||
|
||||
(defn [(route "/arpa-n-gon/nav" GET) (303-if-not-arpa :unless (fn [request] (in "from-iframe" (. request (get "route") (get "parameters") (keys))))) (forward-params "current" "style") (require-params "current") ] /arpa-n-gon/nav #route-args [current [style None]]
|
||||
(shtml-file-response "arpa-n-gon-nav.html" :no-exec True :template-params (dict
|
||||
(defn :async [(route "/arpa-n-gon/nav" GET) (303-if-not-arpa :unless (fn [request] (in "from-iframe" (. request (get "route") (get "parameters") (keys))))) (forward-params "current" "style") (require-params "current") ] /arpa-n-gon/nav #route-args [current [style None]]
|
||||
#await (shtml-file-response "arpa-n-gon-nav.html" :no-exec True :template-params (dict
|
||||
:style style
|
||||
:next (+ "http://" (get (arpa-n-gon.next-member members current) "arpa-domain"))
|
||||
:prev (+ "http://" (get (arpa-n-gon.prev-member members current) "arpa-domain"))
|
||||
:n_gon (arpa-n-gon.n-gon-name (len members)))))
|
||||
|
||||
(defn [(route "/arpa-n-gon/next" GET) (303-if-not-arpa) (forward-params "current" )(require-params "current")] /arpa-n-gon/next #route-args [current] (dict
|
||||
(defn :async [(route "/arpa-n-gon/next" GET) (303-if-not-arpa) (forward-params "current" )(require-params "current")] /arpa-n-gon/next #route-args [current] (dict
|
||||
:code 303
|
||||
:headers {"Location" (+ "http://" (get (arpa-n-gon.next-member members current) "arpa-domain"))}))
|
||||
|
||||
(defn [(route "/arpa-n-gon/prev" GET) (303-if-not-arpa) (forward-params "current") (require-params "current")] /arpa-n-gon/next #route-args [current] (dict
|
||||
(defn :async [(route "/arpa-n-gon/prev" GET) (303-if-not-arpa) (forward-params "current") (require-params "current")] /arpa-n-gon/next #route-args [current] (dict
|
||||
:code 303
|
||||
:headers {"Location" (+ "http://" (get (arpa-n-gon.prev-member members current) "arpa-domain"))}))
|
||||
|
93
srv/main.hy
93
srv/main.hy
@ -4,71 +4,48 @@
|
||||
(import traceback [format-exc])
|
||||
(import http-utils :as http)
|
||||
(import content.router [match-request])
|
||||
(import asyncio)
|
||||
(import concurrent.futures [ThreadPoolExecutor])
|
||||
|
||||
(require hyrule.control [defmain])
|
||||
(require hyrule.oop [meth])
|
||||
|
||||
(setv data-handler-threadpool (ThreadPoolExecutor))
|
||||
|
||||
(defclass http-server-protocol [asyncio.Protocol]
|
||||
|
||||
(meth connection-made [@transport])
|
||||
|
||||
(meth data-received [data]
|
||||
(setv loop (asyncio.get-running-loop))
|
||||
(loop.run-in-executor data-handler-threadpool (fn []
|
||||
(setv thread-loop (asyncio.new-event-loop))
|
||||
(asyncio.set-event-loop thread-loop)
|
||||
|
||||
(setv parsed-request (http.request.parse-data data))
|
||||
(.debug log parsed-request)
|
||||
(.info log (+ (str (cond
|
||||
(in "X-Real-IP" (. parsed-request (get "headers"))) (. parsed-request (get "headers") (get "X-Real-IP"))
|
||||
True (get (.get-extra-info @transport "peername") 1))) f": {(. parsed-request (get "method"))} {(. parsed-request (get "route") (get "path"))}"))
|
||||
|
||||
(setv response-task (asyncio.ensure-future (match-request parsed-request)))
|
||||
(. response-task (add-done-callback (fn [future]
|
||||
(.write @transport (http.response.send #** (future.result)))
|
||||
|
||||
(when (= (. parsed-request (get "headers") (get "Connection")) "close")
|
||||
(@transport.close)))))
|
||||
|
||||
(thread-loop.run-until-complete response-task)))))
|
||||
|
||||
(try
|
||||
(import srv-config [ADDRESS PORT])
|
||||
(except [ModuleNotFoundError]
|
||||
(setv [ADDRESS PORT] ["127.0.0.1" 5000])))
|
||||
|
||||
(defn handle-connection [client-socket address]
|
||||
(try
|
||||
(.settimeout client-socket 10)
|
||||
|
||||
(setv request-data (bytes))
|
||||
(while (setx data (.recv client-socket 1024))
|
||||
(+= request-data data)
|
||||
(when (< (len data) 1024)
|
||||
(break)))
|
||||
|
||||
(when request-data
|
||||
(setv parsed-request (http.request.parse-data request-data))
|
||||
(.debug log parsed-request)
|
||||
(.info log (+ (str (cond
|
||||
(in "X-Real-IP" (. parsed-request (get "headers"))) (. parsed-request (get "headers") (get "X-Real-IP"))
|
||||
True (get address 0))) f": {(. parsed-request (get "method"))} {(. parsed-request (get "route") (get "path"))}"))
|
||||
|
||||
(when (!= (. parsed-request (get "headers") (get "Connection")) "close")
|
||||
(.start (Thread
|
||||
:target handle-connection
|
||||
:args #((.dup client-socket) address))))
|
||||
|
||||
(setv response (match-request parsed-request))
|
||||
(.sendall client-socket (http.response.send #** response)))
|
||||
|
||||
|
||||
(except [e TimeoutError]
|
||||
...)
|
||||
|
||||
(except [e Exception]
|
||||
(.warn log (format-exc)))
|
||||
|
||||
(finally
|
||||
(.close client-socket)
|
||||
(return))))
|
||||
|
||||
(defmain []
|
||||
(let [socket (socket AF_INET SOCK_STREAM)]
|
||||
(try
|
||||
(.setsockopt socket SOL_SOCKET SO_REUSEADDR 1)
|
||||
(.bind socket #(ADDRESS PORT))
|
||||
(.listen socket 10)
|
||||
(.debug log "socket bound")
|
||||
(asyncio.run ((fn :async []
|
||||
(setv loop (asyncio.get-running-loop))
|
||||
|
||||
(while True
|
||||
(try
|
||||
(.start
|
||||
(Thread
|
||||
:target handle-connection
|
||||
:args #(#* (socket.accept))))
|
||||
|
||||
(except [e Exception]
|
||||
(.warn log (format-exc)))))
|
||||
|
||||
(except [e Exception]
|
||||
(.critical log (format-exc)))
|
||||
|
||||
(finally
|
||||
(.close socket)
|
||||
(.info log "server shut down")))))
|
||||
(with [:async server (await (loop.create-server
|
||||
http-server-protocol ADDRESS PORT))]
|
||||
(await (server.serve-forever)))))))
|
||||
|
Reference in New Issue
Block a user