remove astal bindings, as they have been relocated to its fork of libastal
This commit is contained in:
parent
811b745f51
commit
f64f06b35b
@ -1,7 +0,0 @@
|
||||
import gi
|
||||
|
||||
from .binding import bind
|
||||
from .variable import Variable
|
||||
from .file import *
|
||||
from .time import *
|
||||
from .process import *
|
@ -1,31 +0,0 @@
|
||||
import gi
|
||||
|
||||
gi.require_version("GObject", "2.0")
|
||||
|
||||
from gi.repository import GObject
|
||||
|
||||
class Binding(GObject.Object):
|
||||
def __init__(self, emitter: GObject.GObject, property: str | None = None, transform_fn = lambda x: x):
|
||||
self.emitter = emitter
|
||||
self.property = property
|
||||
self.transform_fn = transform_fn
|
||||
|
||||
def get(self):
|
||||
return self.transform_fn(self.emitter.get_property(self.property) if self.property else self.emitter.get())
|
||||
|
||||
def transform(self, fn):
|
||||
return Binding(self.emitter, self.property, lambda x: fn(self.transform_fn(x)))
|
||||
|
||||
def subscribe(self, callback):
|
||||
id = self.emitter.connect(
|
||||
f'notify::{self.property}' if self.property else 'changed',
|
||||
lambda gobject, _=None: callback(self.transform_fn(self.emitter.get_property(self.property) if self.property else self.emitter.get()))
|
||||
)
|
||||
|
||||
def unsubscribe(_=None):
|
||||
self.emitter.disconnect(id)
|
||||
|
||||
return unsubscribe
|
||||
|
||||
def bind(*args, **kwargs):
|
||||
return Binding(*args, **kwargs)
|
@ -1,30 +0,0 @@
|
||||
import gi
|
||||
from typing import Callable
|
||||
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
gi.require_version("GObject", "2.0")
|
||||
|
||||
from gi.repository import AstalIO, GObject
|
||||
|
||||
def read_file(fp: str) -> str:
|
||||
return AstalIO.read_file(fp)
|
||||
|
||||
def read_file_async(fp: str, callback: Callable[[str | None, Exception | None], None]) -> None:
|
||||
try:
|
||||
AstalIO.read_file_async(fp, lambda _, res: callback(AstalIO.read_file_finish(res), None))
|
||||
|
||||
except Exception as e:
|
||||
callback(None, e)
|
||||
|
||||
def write_file(fp: str, content: str) -> None:
|
||||
AstalIO.write_file(fp, content)
|
||||
|
||||
def write_file_async(fp: str, content: str, callback: Callable[[Exception], None]) -> None:
|
||||
try:
|
||||
AstalIO.write_file_async(fp, content, lambda _, res: AstalIO.write_file_finish(res))
|
||||
|
||||
except Exception as e:
|
||||
callback(e)
|
||||
|
||||
def monitor_file(fp: str, callback: Callable[[str, int], None]) -> None:
|
||||
return AstalIO.monitor_file(fp, callback)
|
@ -1,12 +0,0 @@
|
||||
import gi
|
||||
|
||||
gi.require_version("Gtk", "3.0")
|
||||
gi.require_version("Gdk", "3.0")
|
||||
gi.require_version("Astal", "3.0")
|
||||
|
||||
from .app import App
|
||||
from .astalify import astalify
|
||||
from .widget import Widget
|
||||
|
||||
from gi.repository import Gtk, Gdk, Astal
|
||||
|
@ -1,61 +0,0 @@
|
||||
import gi, sys
|
||||
|
||||
from typing import Callable, Optional, Any, Dict
|
||||
|
||||
gi.require_version("Astal", "3.0")
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
from gi.repository import Astal, AstalIO, Gio
|
||||
|
||||
type Config = Dict[str, Any]
|
||||
|
||||
class AstalPy(Astal.Application):
|
||||
request_handler: Optional[Callable[[str, Callable[[Any], None]], None]] = None
|
||||
|
||||
def do_astal_application_request(self, msg: str, conn: Gio.SocketConnection):
|
||||
if callable(self.request_handler):
|
||||
def respond(response: Any):
|
||||
AstalIO.write_sock(conn, str(response), None, None)
|
||||
self.request_handler(msg, respond)
|
||||
else:
|
||||
AstalIO.Application.do_request(self, msg, conn)
|
||||
|
||||
def quit(self, code: int = 0):
|
||||
super().quit()
|
||||
sys.exit(code)
|
||||
|
||||
def apply_css(self, css: str, reset: bool = False):
|
||||
super().apply_css(css, reset)
|
||||
|
||||
def start(self, **config: Any):
|
||||
config.setdefault("client", lambda *_: (print(f'Astal instance "{self.get_instance_name()}" is already running'), sys.exit(1)))
|
||||
config.setdefault("hold", True)
|
||||
|
||||
self.request_handler = config.get("request_handler")
|
||||
|
||||
if "css" in config:
|
||||
self.apply_css(config["css"])
|
||||
if "icons" in config:
|
||||
self.add_icons(config["icons"])
|
||||
|
||||
for key in ["instance_name", "gtk_theme", "icon_theme", "cursor_theme"]:
|
||||
if key in config:
|
||||
self.set_property(key, config[key])
|
||||
|
||||
def on_activate(_):
|
||||
if callable(config.get("main")):
|
||||
config["main"]()
|
||||
if config["hold"]:
|
||||
self.hold()
|
||||
|
||||
self.connect("activate", on_activate)
|
||||
|
||||
try:
|
||||
self.acquire_socket()
|
||||
except Exception:
|
||||
return config["client"](lambda msg: AstalIO.send_message(self.get_instance_name(), msg), *sys.argv[1:])
|
||||
|
||||
self.run()
|
||||
|
||||
return self
|
||||
|
||||
App = AstalPy()
|
@ -1,107 +0,0 @@
|
||||
import gi
|
||||
from functools import partial, singledispatch
|
||||
|
||||
from typing import Callable, List
|
||||
|
||||
from astal.binding import Binding, bind
|
||||
from astal.variable import Variable
|
||||
|
||||
gi.require_version("Astal", "3.0")
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
gi.require_version("Gtk", "3.0")
|
||||
gi.require_version("GObject", "2.0")
|
||||
from gi.repository import Astal, Gtk, GObject
|
||||
|
||||
def astalify(widget: Gtk.Widget):
|
||||
class Widget(widget):
|
||||
__gtype_name__ = "AstalPy" + widget.__name__
|
||||
class_name = ''
|
||||
|
||||
def hook(self, object: GObject.Object, signal_or_callback: str | Callable, callback: Callable = lambda _, x: x):
|
||||
if isinstance(signal_or_callback, Callable):
|
||||
callback = signal_or_callback
|
||||
|
||||
if isinstance(object, Variable):
|
||||
unsubscribe = object.subscribe(callback)
|
||||
|
||||
else:
|
||||
if isinstance(signal_or_callback, Callable): return
|
||||
|
||||
if 'notify::' in signal_or_callback:
|
||||
id = object.connect(f'{signal_or_callback}', lambda obj, *_: callback(self, object.get_property(signal_or_callback.replace('notify::', '').replace('-', '_')) if signal_or_callback.replace('notify::', '') in [*map(lambda x: x.name, object.list_properties())] else None))
|
||||
|
||||
else:
|
||||
id = object.connect(signal_or_callback, lambda _, value, *args: callback(self, value) if not args else callback(self, value, *args))
|
||||
|
||||
unsubscribe = lambda _=None: object.disconnect(id)
|
||||
|
||||
self.connect('destroy', unsubscribe)
|
||||
|
||||
def toggle_class_name(self, name: str, state: bool | None = None):
|
||||
Astal.widget_toggle_class_name(self, name, state if state is not None else not name in Astal.widget_get_class_names(self))
|
||||
|
||||
@GObject.Property(type=str)
|
||||
def class_name(self):
|
||||
return ' '.join(Astal.widget_get_class_names(self))
|
||||
|
||||
@class_name.setter
|
||||
def class_name(self, name):
|
||||
Astal.widget_set_class_names(self, name.split(' '))
|
||||
|
||||
@GObject.Property(type=str)
|
||||
def css(self):
|
||||
return Astal.widget_get_css(self)
|
||||
|
||||
@css.setter
|
||||
def css(self, css: str):
|
||||
Astal.widget_set_css(self, css)
|
||||
|
||||
@GObject.Property(type=str)
|
||||
def cursor(self):
|
||||
return Astal.widget_get_cursor(self)
|
||||
|
||||
@cursor.setter
|
||||
def cursor(self, cursor: str):
|
||||
Astal.widget_set_cursor(self, cursor)
|
||||
|
||||
@GObject.Property(type=str)
|
||||
def click_through(self):
|
||||
return Astal.widget_get_click_through(self)
|
||||
|
||||
@click_through.setter
|
||||
def click_through(self, click_through: str):
|
||||
Astal.widget_set_click_through(self, click_through)
|
||||
|
||||
if widget == Astal.Box or widget == Gtk.Box:
|
||||
@GObject.Property()
|
||||
def children(self):
|
||||
return Astal.Box.get_children(self)
|
||||
|
||||
@children.setter
|
||||
def children(self, children):
|
||||
Astal.Box.set_children(self, children)
|
||||
|
||||
def __init__(self, **props):
|
||||
super().__init__()
|
||||
|
||||
self.set_visible(props.get("visible", True))
|
||||
|
||||
for prop, value in props.items():
|
||||
if isinstance(value, Binding):
|
||||
self.set_property(prop, value.get())
|
||||
unsubscribe = value.subscribe(partial(self.set_property, prop))
|
||||
self.connect('destroy', unsubscribe)
|
||||
|
||||
elif 'on_' == prop[0:3] and isinstance(value, Callable):
|
||||
self.connect(prop.replace('on_', '', 1), value)
|
||||
|
||||
elif prop.replace('_', '-') in map(lambda x: x.name, self.props):
|
||||
self.set_property(prop.replace('_', '-'), value)
|
||||
|
||||
elif prop == 'setup' and isinstance(value, Callable):
|
||||
value(self)
|
||||
|
||||
else:
|
||||
self.__setattr__(prop, value)
|
||||
|
||||
return Widget
|
@ -1,33 +0,0 @@
|
||||
import gi
|
||||
|
||||
from .astalify import astalify
|
||||
from enum import Enum
|
||||
|
||||
gi.require_version("Astal", "3.0")
|
||||
gi.require_version("Gtk", "3.0")
|
||||
|
||||
from gi.repository import Astal, Gtk
|
||||
|
||||
class CallableEnum(Enum):
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.value(*args, **kwargs)
|
||||
|
||||
class Widget(CallableEnum):
|
||||
Box = astalify(Astal.Box)
|
||||
Button = astalify(Astal.Button)
|
||||
CenterBox = astalify(Astal.CenterBox)
|
||||
CircularProgress = astalify(Astal.CircularProgress)
|
||||
DrawingArea = astalify(Gtk.DrawingArea)
|
||||
Entry = astalify(Gtk.Entry)
|
||||
EventBox = astalify(Astal.EventBox)
|
||||
Icon = astalify(Astal.Icon)
|
||||
Label = astalify(Gtk.Label)
|
||||
LevelBar = astalify(Astal.LevelBar)
|
||||
MenuButton = astalify(Gtk.MenuButton)
|
||||
Overlay = astalify(Astal.Overlay)
|
||||
Revealer = astalify(Gtk.Revealer)
|
||||
Scrollable = astalify(Astal.Scrollable)
|
||||
Slider = astalify(Astal.Slider)
|
||||
Stack = astalify(Astal.Stack)
|
||||
Switch = astalify(Gtk.Switch)
|
||||
Window = astalify(Astal.Window)
|
@ -1,48 +0,0 @@
|
||||
import gi, sys
|
||||
|
||||
from typing import List, Callable
|
||||
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
|
||||
from gi.repository import AstalIO
|
||||
|
||||
def subprocess(command: str | List[str], output=None, error=None) -> AstalIO.Process | None:
|
||||
if not output:
|
||||
output = lambda proc, x: sys.stdout.write(x + '\n')
|
||||
|
||||
if not error:
|
||||
error = lambda proc, x: sys.stderr.write(x + '\n')
|
||||
|
||||
if isinstance(command, list):
|
||||
proc = AstalIO.Process.subprocessv(command)
|
||||
|
||||
else:
|
||||
proc = AstalIO.Process.subprocess(command)
|
||||
|
||||
proc.connect('stdout', output)
|
||||
proc.connect('stderr', error)
|
||||
|
||||
return proc
|
||||
|
||||
def exec(command: str | List[str]) -> str:
|
||||
if isinstance(command, list):
|
||||
return AstalIO.Process.execv(command)
|
||||
|
||||
else:
|
||||
return AstalIO.Process.exec(command)
|
||||
|
||||
def exec_async(command: str | List[str], callback: Callable[[str, str], None] | None = None) -> None:
|
||||
def default_callback(output, error=None):
|
||||
if error:
|
||||
sys.stderr.write(error + '\n')
|
||||
|
||||
else: sys.stdout.write(output + '\n')
|
||||
|
||||
if not callback:
|
||||
callback = default_callback
|
||||
|
||||
if isinstance(command, list):
|
||||
AstalIO.Process.exec_asyncv(command, lambda _, res: callback(AstalIO.Process.exec_asyncv_finish(res)))
|
||||
|
||||
else:
|
||||
AstalIO.Process.exec_async(command, lambda _, res: callback(AstalIO.Process.exec_finish(res)))
|
@ -1,18 +0,0 @@
|
||||
import gi
|
||||
|
||||
from typing import Callable
|
||||
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
gi.require_version("GObject", "2.0")
|
||||
|
||||
from gi.repository import AstalIO, GObject
|
||||
|
||||
def interval(interval: int, callback: Callable) -> AstalIO.Time:
|
||||
return AstalIO.Time.interval(interval, callback)
|
||||
|
||||
def timeout(timeout: int, callback: Callable) -> AstalIO.Time:
|
||||
return AstalIO.Time.timeout(timeout, callback)
|
||||
|
||||
def idle(callback: Callable) -> AstalIO.Time:
|
||||
return AstalIO.Time.idle(callback)
|
||||
|
@ -1,161 +0,0 @@
|
||||
import gi
|
||||
import asyncio
|
||||
from .process import *
|
||||
from .binding import Binding
|
||||
|
||||
gi.require_version("Astal", "3.0")
|
||||
gi.require_version("AstalIO", "0.1")
|
||||
gi.require_version("GObject", "2.0")
|
||||
|
||||
from gi.repository import Astal, AstalIO, GObject
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Callable, List, Optional, Union
|
||||
|
||||
class Variable(AstalIO.VariableBase):
|
||||
def __init__(self, init_value=None):
|
||||
super().__init__()
|
||||
self.value = init_value
|
||||
self.watch_proc = None
|
||||
self.poll_interval = None
|
||||
self.poll_exec = None
|
||||
self.poll_transform = None
|
||||
self.poll_fn = None
|
||||
self.poll_timer = None
|
||||
self.watch_transform = None
|
||||
self.watch_exec = []
|
||||
|
||||
self.connect('dropped', self._on_dropped)
|
||||
|
||||
def __del__(self):
|
||||
self.emit_dropped()
|
||||
|
||||
def __call__(self, transform: Callable = lambda x: x):
|
||||
return Binding(self).transform(transform)
|
||||
|
||||
def subscribe(self, callback):
|
||||
id = self.emitter.connect(
|
||||
'changed',
|
||||
lambda gobject, _=None: callback(self.emitter.get_value())
|
||||
)
|
||||
|
||||
def unsubscribe(_=None):
|
||||
self.emit_dropped()
|
||||
self.emitter.disconnect(id)
|
||||
|
||||
return unsubscribe
|
||||
|
||||
def get_value(self):
|
||||
return self.value
|
||||
|
||||
def set_value(self, new_value):
|
||||
self.value = new_value
|
||||
self.emit_changed()
|
||||
|
||||
def get(self):
|
||||
return self.value
|
||||
|
||||
def set(self, new_value):
|
||||
self.value = new_value
|
||||
self.emit_changed()
|
||||
|
||||
def poll(self, interval, exec, transform=lambda x: x):
|
||||
self.stop_poll()
|
||||
self.poll_transform = transform
|
||||
self.poll_interval = interval
|
||||
if isinstance(exec, Callable):
|
||||
self.poll_fn = exec
|
||||
|
||||
else:
|
||||
self.poll_exec = exec
|
||||
|
||||
self.start_poll()
|
||||
return self
|
||||
|
||||
def start_poll(self):
|
||||
if self.is_polling(): return
|
||||
if not self.poll_transform: return
|
||||
|
||||
if self.poll_fn:
|
||||
self.poll_timer = AstalIO.Time.interval(self.poll_interval, lambda: self.set_value(self.poll_transform(self.poll_fn(self.get_value()))))
|
||||
|
||||
else:
|
||||
self.poll_timer = AstalIO.Time.interval(
|
||||
self.poll_interval,
|
||||
lambda: exec_async(self.poll_exec, lambda out, err=None:
|
||||
self.set_value(self.poll_transform(out, self.get_value()))
|
||||
)
|
||||
)
|
||||
|
||||
def stop_poll(self):
|
||||
if self.is_polling():
|
||||
self.poll_timer.cancel()
|
||||
self.poll_timer = None
|
||||
|
||||
return self
|
||||
|
||||
def watch(self, exec, transform=lambda x, _: x):
|
||||
self.stop_watch()
|
||||
self.watch_transform = transform
|
||||
self.watch_exec = exec
|
||||
self.start_watch()
|
||||
return self
|
||||
|
||||
def start_watch(self):
|
||||
if self.is_watching(): return
|
||||
if not self.watch_transform: return
|
||||
|
||||
self.watch_proc = subprocess(
|
||||
self.watch_exec,
|
||||
lambda _, out: self.set_value(self.watch_transform(out, self.get_value())),
|
||||
)
|
||||
|
||||
def stop_watch(self):
|
||||
if self.is_watching():
|
||||
self.watch_proc.kill()
|
||||
self.watch_proc = None
|
||||
|
||||
return self
|
||||
|
||||
def _on_dropped(self, *_):
|
||||
if self.is_polling():
|
||||
self.stop_poll()
|
||||
if self.is_watching():
|
||||
self.stop_watch()
|
||||
|
||||
def is_polling(self):
|
||||
return self.poll_timer != None
|
||||
|
||||
def is_watching(self):
|
||||
return self.watch_proc != None
|
||||
|
||||
def observe(self, object, signal_or_callback, callback=lambda _, x: x):
|
||||
if isinstance(signal_or_callback, str):
|
||||
f = callback
|
||||
|
||||
else:
|
||||
f = signal_or_callback
|
||||
|
||||
set = lambda *args: self.set(f(*args))
|
||||
|
||||
if isinstance(signal_or_callback, str):
|
||||
object.connect(signal_or_callback, set)
|
||||
|
||||
if isinstance(object, list):
|
||||
for connectable, signal in object:
|
||||
connectable.connect(signal, set)
|
||||
|
||||
return self
|
||||
|
||||
@classmethod
|
||||
def derive(cls, objects: List[Union[Binding, 'Variable']], transform=lambda *args: args):
|
||||
update = lambda: transform(*map(lambda object: object.get(), objects))
|
||||
|
||||
derived = Variable(update())
|
||||
|
||||
unsubs = [*map(lambda object: object.subscribe(lambda *_: derived.set(update())), objects)]
|
||||
|
||||
derived.connect('dropped', lambda *_: map(lambda unsub: unsub(), unsubs))
|
||||
|
||||
return derived
|
Loading…
x
Reference in New Issue
Block a user