204 lines
6.0 KiB
Python
204 lines
6.0 KiB
Python
# adapted from
|
|
# https://python-wayland.org/examples/
|
|
# examples/20-list-monitors.py
|
|
# uses zwlr_gamma_control_manager_v1
|
|
# https://python-wayland.org/wayland/zwlr_gamma_control_manager_v1/
|
|
|
|
# default gamma ramps
|
|
r = [0, 1]
|
|
g = [0, 1]
|
|
b = [0, 1]
|
|
|
|
import os, sys
|
|
from time import sleep
|
|
import wayland
|
|
from wayland.client import wayland_class
|
|
from itertools import chain
|
|
|
|
@wayland_class("wl_registry")
|
|
class Registry(wayland.wl_registry):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.gamma = None
|
|
self.outputs = []
|
|
self.gammas = []
|
|
def on_global(self, name, interface, version):
|
|
match interface:
|
|
case "zwlr_gamma_control_manager_v1":
|
|
self.gamma = self.bind(name, interface, version)
|
|
self.gammas = [self.gamma.get_gamma_control(output) for output in self.outputs]
|
|
case "wl_output":
|
|
output = self.bind(name, interface, version)
|
|
self.outputs.append(output)
|
|
if self.gamma is not None:
|
|
self.gammas.append(self.gamma.get_gamma_control(output))
|
|
|
|
@wayland_class("wl_output")
|
|
class Output(wayland.wl_output):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.done = False
|
|
def on_done(self):
|
|
self.done = True
|
|
|
|
@wayland_class("zwlr_gamma_control_v1")
|
|
class GammaControl(wayland.zwlr_gamma_control_v1):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.done = False
|
|
self.size = None
|
|
def on_gamma_size(self, size):
|
|
self.size = size
|
|
self.done = True
|
|
def on_failed(self):
|
|
self.done = True
|
|
|
|
display = wayland.wl_display()
|
|
registry = display.get_registry()
|
|
|
|
while not registry.outputs or not all(v.done for v in chain(registry.outputs, registry.gammas)):
|
|
display.dispatch_timeout(0.1)
|
|
|
|
def lerp(a, b, f):
|
|
return a + f * (b - a)
|
|
def scale_ramp(ramp, size):
|
|
if size < 2 or len(ramp) < 2:
|
|
raise ValueError()
|
|
ranges = len(ramp) - 1
|
|
end = 0
|
|
curr = ramp[0]
|
|
if isinstance(curr, tuple):
|
|
a, b = curr
|
|
curr = a
|
|
for i in range(1, len(ramp)):
|
|
prev = curr
|
|
start = end
|
|
if isinstance(ramp[i], tuple):
|
|
a, b = ramp[i]
|
|
curr = a
|
|
end = int((size-1) * b)
|
|
else:
|
|
curr = ramp[i]
|
|
end = (size-1) * i // ranges
|
|
width = end - start
|
|
if width <= 0:
|
|
end = start
|
|
else:
|
|
for j in range(0, width):
|
|
# linearly interpolate and convert to u16
|
|
n = int(lerp(prev, curr, j / width) * 65535)
|
|
yield n & 0xFF
|
|
yield (n >> 8) & 0xFF
|
|
if isinstance(ramp[-1], tuple):
|
|
a, b = ramp[-1]
|
|
n = int(a * 65535)
|
|
else:
|
|
n = int(ramp[-1] * 65535)
|
|
yield n & 0xFF
|
|
yield (n >> 8) & 0xFF
|
|
|
|
def parse_ramp(line, prev):
|
|
line = line.strip()
|
|
if line:
|
|
return [float(f) if not ':' in f else (float(f.split(':')[0]), float(f.split(':')[1])) for f in line.split()]
|
|
else:
|
|
out = ""
|
|
for f in prev:
|
|
if out:
|
|
out += ' '
|
|
if isinstance(f, tuple):
|
|
a, b = f
|
|
out += str(a)+':'+str(b)
|
|
else:
|
|
out += str(f)
|
|
print(out)
|
|
return prev
|
|
def parse_ramp_int(line, prev):
|
|
line = line.strip()
|
|
if line:
|
|
return [float(f) / 255 if not ':' in f else (float(f.split(':')[0]) / 255, float(f.split(':')[1]) / 255) for f in line.split()]
|
|
else:
|
|
out = ""
|
|
for f in prev:
|
|
if out:
|
|
out += ' '
|
|
if isinstance(f, tuple):
|
|
a, b = f
|
|
out += str(int(round(a * 255)))+':'+str(int(round(b * 255)))
|
|
else:
|
|
out += str(int(round(f * 255)))
|
|
print(out)
|
|
return prev
|
|
|
|
def execute(line):
|
|
global r, g, b
|
|
match line.split()[0]:
|
|
case "r":
|
|
r = parse_ramp(line[1:].strip(), r)
|
|
case "g":
|
|
g = parse_ramp(line[1:].strip(), g)
|
|
case "b":
|
|
b = parse_ramp(line[1:].strip(), b)
|
|
case "c":
|
|
r = parse_ramp(line[1:].strip(), [0, 1])
|
|
g = r
|
|
b = r
|
|
case "R":
|
|
r = parse_ramp_int(line[1:].strip(), r)
|
|
case "G":
|
|
g = parse_ramp_int(line[1:].strip(), g)
|
|
case "B":
|
|
b = parse_ramp_int(line[1:].strip(), b)
|
|
case "C":
|
|
r = parse_ramp_int(line[1:].strip(), [0, 1])
|
|
g = r
|
|
b = r
|
|
case "commit":
|
|
for i, gamma in enumerate(registry.gammas):
|
|
if gamma.size:
|
|
fd = os.memfd_create(f"gamma{i}", os.MFD_CLOEXEC)
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
data = bytearray((by for ramp in [r, g, b] for by in scale_ramp(ramp, gamma.size)))
|
|
os.write(fd, data)
|
|
os.truncate(fd, 3*2*gamma.size)
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
gamma.set_gamma(fd)
|
|
case "hold":
|
|
execute("commit")
|
|
try:
|
|
while True:
|
|
sleep(60)
|
|
except:
|
|
exit()
|
|
|
|
for arg in sys.argv[1:]:
|
|
if arg.startswith("-"):
|
|
print("""
|
|
wlrcolormess expects each argument or line from stdin to be one of the following:
|
|
|
|
commit - apply color ramps (automatically done after argument parsing)
|
|
hold - commit, then close stdin, but don't exit
|
|
[rgbRGB] - print current color ramps
|
|
[cC] - reset color ramps to default and print one
|
|
[rgbc] <ramp> - set ramp for one or all colors, range 0-1
|
|
[RGBC] <ramp> - set ramp for one or all colors, range 0-255
|
|
|
|
<ramp>: a whitespace-separated list of color values:
|
|
`0 1`, `0 0.2 1`, `0 0.5:0.2 1` for [rgbc].
|
|
`0 255`, `0 50 255`, `0 127.5:50 255` for [RGBC].
|
|
values larger than 1 or 255 will overflow.
|
|
""")
|
|
exit()
|
|
execute(arg)
|
|
execute("commit")
|
|
|
|
while True:
|
|
try:
|
|
line = input()
|
|
except:
|
|
break
|
|
try:
|
|
execute(line)
|
|
except:
|
|
print("error")
|