first commit

This commit is contained in:
2025-06-13 23:47:44 +02:00
commit 373bf070a4
2860 changed files with 1663538 additions and 0 deletions

View File

@@ -0,0 +1,451 @@
#
# MarlinBinaryProtocol.py
# Supporting Firmware upload via USB/Serial, saving to the attached media.
#
import serial
import math
import time
from collections import deque
import threading
import sys
import datetime
import random
try:
import heatshrink2 as heatshrink
heatshrink_exists = True
except ImportError:
try:
import heatshrink
heatshrink_exists = True
except ImportError:
heatshrink_exists = False
def millis():
return time.perf_counter() * 1000
class TimeOut(object):
def __init__(self, milliseconds):
self.duration = milliseconds
self.reset()
def reset(self):
self.endtime = millis() + self.duration
def timedout(self):
return millis() > self.endtime
class ReadTimeout(Exception):
pass
class FatalError(Exception):
pass
class SycronisationError(Exception):
pass
class PayloadOverflow(Exception):
pass
class ConnectionLost(Exception):
pass
class Protocol(object):
device = None
baud = None
max_block_size = 0
port = None
block_size = 0
packet_transit = None
packet_status = None
packet_ping = None
errors = 0
packet_buffer = None
simulate_errors = 0
sync = 0
connected = False
syncronised = False
worker_thread = None
response_timeout = 1000
applications = []
responses = deque()
def __init__(self, device, baud, bsize, simerr, timeout):
print("pySerial Version:", serial.VERSION)
self.port = serial.Serial(device, baudrate = baud, write_timeout = 0, timeout = 1)
self.device = device
self.baud = baud
self.block_size = int(bsize)
self.simulate_errors = max(min(simerr, 1.0), 0.0)
self.connected = True
self.response_timeout = timeout
self.register(['ok', 'rs', 'ss', 'fe'], self.process_input)
self.worker_thread = threading.Thread(target=Protocol.receive_worker, args=(self,))
self.worker_thread.start()
def receive_worker(self):
while self.port.in_waiting:
self.port.reset_input_buffer()
def dispatch(data):
for tokens, callback in self.applications:
for token in tokens:
if token == data[:len(token)]:
callback((token, data[len(token):]))
return
def reconnect():
print("Reconnecting..")
self.port.close()
for x in range(10):
try:
if self.connected:
self.port = serial.Serial(self.device, baudrate = self.baud, write_timeout = 0, timeout = 1)
return
else:
print("Connection closed")
return
except:
time.sleep(1)
raise ConnectionLost()
while self.connected:
try:
data = self.port.readline().decode('utf8').rstrip()
if len(data):
#print(data)
dispatch(data)
except OSError:
reconnect()
except UnicodeDecodeError:
# dodgy client output or datastream corruption
self.port.reset_input_buffer()
def shutdown(self):
self.connected = False
self.worker_thread.join()
self.port.close()
def process_input(self, data):
#print(data)
self.responses.append(data)
def register(self, tokens, callback):
self.applications.append((tokens, callback))
def send(self, protocol, packet_type, data = bytearray()):
self.packet_transit = self.build_packet(protocol, packet_type, data)
self.packet_status = 0
self.transmit_attempt = 0
timeout = TimeOut(self.response_timeout * 20)
while self.packet_status == 0:
try:
if timeout.timedout():
raise ConnectionLost()
self.transmit_packet(self.packet_transit)
self.await_response()
except ReadTimeout:
self.errors += 1
#print("Packetloss detected..")
self.packet_transit = None
def await_response(self):
timeout = TimeOut(self.response_timeout)
while not len(self.responses):
time.sleep(0.00001)
if timeout.timedout():
raise ReadTimeout()
while len(self.responses):
token, data = self.responses.popleft()
switch = {'ok' : self.response_ok, 'rs': self.response_resend, 'ss' : self.response_stream_sync, 'fe' : self.response_fatal_error}
switch[token](data)
def send_ascii(self, data, send_and_forget = False):
self.packet_transit = bytearray(data, "utf8") + b'\n'
self.packet_status = 0
self.transmit_attempt = 0
timeout = TimeOut(self.response_timeout * 20)
while self.packet_status == 0:
try:
if timeout.timedout():
return
self.port.write(self.packet_transit)
if send_and_forget:
self.packet_status = 1
else:
self.await_response_ascii()
except ReadTimeout:
self.errors += 1
#print("Packetloss detected..")
except serial.SerialException:
return
self.packet_transit = None
def await_response_ascii(self):
timeout = TimeOut(self.response_timeout)
while not len(self.responses):
time.sleep(0.00001)
if timeout.timedout():
raise ReadTimeout()
token, data = self.responses.popleft()
self.packet_status = 1
def corrupt_array(self, data):
rid = random.randint(0, len(data) - 1)
data[rid] ^= 0xAA
return data
def transmit_packet(self, packet):
packet = bytearray(packet)
if (self.simulate_errors > 0 and random.random() > (1.0 - self.simulate_errors)):
if random.random() > 0.9:
#random data drop
start = random.randint(0, len(packet))
end = start + random.randint(1, 10)
packet = packet[:start] + packet[end:]
#print("Dropping {0} bytes".format(end - start))
else:
#random corruption
packet = self.corrupt_array(packet)
#print("Single byte corruption")
self.port.write(packet)
self.transmit_attempt += 1
def build_packet(self, protocol, packet_type, data = bytearray()):
PACKET_TOKEN = 0xB5AD
if len(data) > self.max_block_size:
raise PayloadOverflow()
packet_buffer = bytearray()
packet_buffer += self.pack_int8(self.sync) # 8bit sync id
packet_buffer += self.pack_int4_2(protocol, packet_type) # 4 bit protocol id, 4 bit packet type
packet_buffer += self.pack_int16(len(data)) # 16bit packet length
packet_buffer += self.pack_int16(self.build_checksum(packet_buffer)) # 16bit header checksum
if len(data):
packet_buffer += data
packet_buffer += self.pack_int16(self.build_checksum(packet_buffer))
packet_buffer = self.pack_int16(PACKET_TOKEN) + packet_buffer # 16bit start token, not included in checksum
return packet_buffer
# checksum 16 fletchers
def checksum(self, cs, value):
cs_low = (((cs & 0xFF) + value) % 255)
return ((((cs >> 8) + cs_low) % 255) << 8) | cs_low
def build_checksum(self, buffer):
cs = 0
for b in buffer:
cs = self.checksum(cs, b)
return cs
def pack_int32(self, value):
return value.to_bytes(4, byteorder='little')
def pack_int16(self, value):
return value.to_bytes(2, byteorder='little')
def pack_int8(self, value):
return value.to_bytes(1, byteorder='little')
def pack_int4_2(self, vh, vl):
value = ((vh & 0xF) << 4) | (vl & 0xF)
return value.to_bytes(1, byteorder='little')
def connect(self):
print("Connecting: Switching Marlin to Binary Protocol...")
self.send_ascii("M28B1")
self.send(0, 1)
def disconnect(self):
self.send(0, 2)
self.syncronised = False
def response_ok(self, data):
try:
packet_id = int(data)
except ValueError:
return
if packet_id != self.sync:
raise SycronisationError()
self.sync = (self.sync + 1) % 256
self.packet_status = 1
def response_resend(self, data):
packet_id = int(data)
self.errors += 1
if not self.syncronised:
print("Retrying syncronisation")
elif packet_id != self.sync:
raise SycronisationError()
def response_stream_sync(self, data):
sync, max_block_size, protocol_version = data.split(',')
self.sync = int(sync)
self.max_block_size = int(max_block_size)
self.block_size = self.max_block_size if self.max_block_size < self.block_size else self.block_size
self.protocol_version = protocol_version
self.packet_status = 1
self.syncronised = True
print("Connection synced [{0}], binary protocol version {1}, {2} byte payload buffer".format(self.sync, self.protocol_version, self.max_block_size))
def response_fatal_error(self, data):
raise FatalError()
class FileTransferProtocol(object):
protocol_id = 1
class Packet(object):
QUERY = 0
OPEN = 1
CLOSE = 2
WRITE = 3
ABORT = 4
responses = deque()
def __init__(self, protocol, timeout = None):
protocol.register(['PFT:success', 'PFT:version:', 'PFT:fail', 'PFT:busy', 'PFT:ioerror', 'PTF:invalid'], self.process_input)
self.protocol = protocol
self.response_timeout = timeout or protocol.response_timeout
def process_input(self, data):
#print(data)
self.responses.append(data)
def await_response(self, timeout = None):
timeout = TimeOut(timeout or self.response_timeout)
while not len(self.responses):
time.sleep(0.0001)
if timeout.timedout():
raise ReadTimeout()
return self.responses.popleft()
def connect(self):
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.QUERY)
token, data = self.await_response()
if token != 'PFT:version:':
return False
self.version, _, compression = data.split(':')
if compression != 'none':
algorithm, window, lookahead = compression.split(',')
self.compression = {'algorithm': algorithm, 'window': int(window), 'lookahead': int(lookahead)}
else:
self.compression = {'algorithm': 'none'}
print("File Transfer version: {0}, compression: {1}".format(self.version, self.compression['algorithm']))
def open(self, filename, compression, dummy):
payload = b'\1' if dummy else b'\0' # dummy transfer
payload += b'\1' if compression else b'\0' # payload compression
payload += bytearray(filename, 'utf8') + b'\0'# target filename + null terminator
timeout = TimeOut(5000)
token = None
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.OPEN, payload)
while token != 'PFT:success' and not timeout.timedout():
try:
token, data = self.await_response(1000)
if token == 'PFT:success':
print(filename,"opened")
return
elif token == 'PFT:busy':
print("Broken transfer detected, purging")
self.abort()
time.sleep(0.1)
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.OPEN, payload)
timeout.reset()
elif token == 'PFT:fail':
raise Exception("Can not open file on client")
except ReadTimeout:
pass
raise ReadTimeout()
def write(self, data):
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.WRITE, data)
def close(self):
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.CLOSE)
token, data = self.await_response(1000)
if token == 'PFT:success':
print("File closed")
return True
elif token == 'PFT:ioerror':
print("Client storage device IO error")
return False
elif token == 'PFT:invalid':
print("No open file")
return False
def abort(self):
self.protocol.send(FileTransferProtocol.protocol_id, FileTransferProtocol.Packet.ABORT)
token, data = self.await_response()
if token == 'PFT:success':
print("Transfer Aborted")
def copy(self, filename, dest_filename, compression, dummy):
self.connect()
has_heatshrink = heatshrink_exists and self.compression['algorithm'] == 'heatshrink'
if compression and not has_heatshrink:
hs = '2' if sys.version_info[0] > 2 else ''
print("Compression not supported by client. Use 'pip install heatshrink%s' to fix." % hs)
compression = False
data = open(filename, "rb").read()
filesize = len(data)
self.open(dest_filename, compression, dummy)
block_size = self.protocol.block_size
if compression:
data = heatshrink.encode(data, window_sz2=self.compression['window'], lookahead_sz2=self.compression['lookahead'])
cratio = filesize / len(data)
blocks = math.floor((len(data) + block_size - 1) / block_size)
kibs = 0
dump_pctg = 0
start_time = millis()
for i in range(blocks):
start = block_size * i
end = start + block_size
self.write(data[start:end])
kibs = (( (i+1) * block_size) / 1024) / (millis() + 1 - start_time) * 1000
if (i / blocks) >= dump_pctg:
print("\r{0:2.0f}% {1:4.2f}KiB/s {2} Errors: {3}".format((i / blocks) * 100, kibs, "[{0:4.2f}KiB/s]".format(kibs * cratio) if compression else "", self.protocol.errors), end='')
dump_pctg += 0.1
if self.protocol.errors > 0:
# Dump last status (errors may not be visible)
print("\r{0:2.0f}% {1:4.2f}KiB/s {2} Errors: {3} - Aborting...".format((i / blocks) * 100, kibs, "[{0:4.2f}KiB/s]".format(kibs * cratio) if compression else "", self.protocol.errors), end='')
print("") # New line to break the transfer speed line
self.close()
print("Transfer aborted due to protocol errors")
#raise Exception("Transfer aborted due to protocol errors")
return False
print("\r{0:2.0f}% {1:4.2f}KiB/s {2} Errors: {3}".format(100, kibs, "[{0:4.2f}KiB/s]".format(kibs * cratio) if compression else "", self.protocol.errors)) # no one likes transfers finishing at 99.8%
if not self.close():
print("Transfer failed")
return False
print("Transfer complete")
return True
class EchoProtocol(object):
def __init__(self, protocol):
protocol.register(['echo:'], self.process_input)
self.protocol = protocol
def process_input(self, data):
print(data)

View File

@@ -0,0 +1,304 @@
/**************************************\
* *
* OpenSCAD Mesh Display *
* by Thinkyhead - April 2017 *
* *
* Copy the grid output from Marlin, *
* paste below as shown, and use *
* OpenSCAD to see a visualization *
* of your mesh. *
* *
\**************************************/
$t = 0.15; // comment out during animation!
X = 0; Y = 1;
L = 0; R = 1; F = 2; B = 3;
//
// Sample Mesh - Replace with your own
//
measured_z = [
[ -1.20, -1.13, -1.09, -1.03, -1.19 ],
[ -1.16, -1.25, -1.27, -1.25, -1.08 ],
[ -1.13, -1.26, -1.39, -1.31, -1.18 ],
[ -1.09, -1.20, -1.26, -1.21, -1.18 ],
[ -1.13, -0.99, -1.03, -1.06, -1.32 ]
];
//
// An offset to add to all points in the mesh
//
zadjust = 0;
//
// Mesh characteristics
//
bed_size = [ 200, 200 ];
mesh_inset = [ 10, 10, 10, 10 ]; // L, F, R, B
mesh_bounds = [
[ mesh_inset[L], mesh_inset[F] ],
[ bed_size[X] - mesh_inset[R], bed_size[Y] - mesh_inset[B] ]
];
mesh_size = mesh_bounds[1] - mesh_bounds[0];
// NOTE: Marlin meshes already subtract the probe offset
NAN = 0; // Z to use for un-measured points
//
// Geometry
//
max_z_scale = 100; // Scale at Time 0.5
min_z_scale = 10; // Scale at Time 0.0 and 1.0
thickness = 0.5; // thickness of the mesh triangles
tesselation = 1; // levels of tesselation from 0-2
alternation = 2; // direction change modulus (try it)
//
// Appearance
//
show_plane = true;
show_labels = true;
show_coords = true;
arrow_length = 5;
label_font_lg = "Arial";
label_font_sm = "Arial";
mesh_color = [1,1,1,0.5];
plane_color = [0.4,0.6,0.9,0.6];
//================================================ Derive useful values
big_z = max_2D(measured_z,0);
lil_z = min_2D(measured_z,0);
mean_value = (big_z + lil_z) / 2.0;
mesh_points_y = len(measured_z);
mesh_points_x = len(measured_z[0]);
xspace = mesh_size[X] / (mesh_points_x - 1);
yspace = mesh_size[Y] / (mesh_points_y - 1);
// At $t=0 and $t=1 scale will be 100%
z_scale_factor = min_z_scale + (($t > 0.5) ? 1.0 - $t : $t) * (max_z_scale - min_z_scale) * 2;
//
// Min and max recursive functions for 1D and 2D arrays
// Return the smallest or largest value in the array
//
function some_1D(b,i) = (i<len(b)-1) ? (b[i] && some_1D(b,i+1)) : b[i] != 0;
function some_2D(a,j) = (j<len(a)-1) ? some_2D(a,j+1) : some_1D(a[j], 0);
function min_1D(b,i) = (i<len(b)-1) ? min(b[i], min_1D(b,i+1)) : b[i];
function min_2D(a,j) = (j<len(a)-1) ? min_2D(a,j+1) : min_1D(a[j], 0);
function max_1D(b,i) = (i<len(b)-1) ? max(b[i], max_1D(b,i+1)) : b[i];
function max_2D(a,j) = (j<len(a)-1) ? max_2D(a,j+1) : max_1D(a[j], 0);
//
// Get the corner probe points of a grid square.
//
// Input : x,y grid indexes
// Output : An array of the 4 corner points
//
function grid_square(x,y) = [
[x * xspace, y * yspace, z_scale_factor * (measured_z[y][x] - mean_value)],
[x * xspace, (y+1) * yspace, z_scale_factor * (measured_z[y+1][x] - mean_value)],
[(x+1) * xspace, (y+1) * yspace, z_scale_factor * (measured_z[y+1][x+1] - mean_value)],
[(x+1) * xspace, y * yspace, z_scale_factor * (measured_z[y][x+1] - mean_value)]
];
// The corner point of a grid square with Z centered on the mean
function pos(x,y,z) = [x * xspace, y * yspace, z_scale_factor * (z - mean_value)];
//
// Draw the point markers and labels
//
module point_markers(show_home=true) {
// Mark the home position 0,0
if (show_home)
translate([1,1]) color([0,0,0,0.25])
cylinder(r=1, h=z_scale_factor, center=true);
for (x=[0:mesh_points_x-1], y=[0:mesh_points_y-1]) {
z = measured_z[y][x] - zadjust;
down = z < mean_value;
xyz = pos(x, y, z);
translate([ xyz[0], xyz[1] ]) {
// Show the XY as well as the Z!
if (show_coords) {
color("black")
translate([0,0,0.5]) {
$fn=8;
rotate([0,0]) {
posx = floor(mesh_bounds[0][X] + x * xspace);
posy = floor(mesh_bounds[0][Y] + y * yspace);
text(str(posx, ",", posy), 2, label_font_sm, halign="center", valign="center");
}
}
}
translate([ 0, 0, xyz[2] ]) {
// Label each point with the Z
v = z - mean_value;
if (show_labels) {
color(abs(v) < 0.1 ? [0,0.5,0] : [0.25,0,0])
translate([0,0,down?-10:10]) {
$fn=8;
rotate([90,0])
text(str(z), 6, label_font_lg, halign="center", valign="center");
if (v)
translate([0,0,down?-6:6]) rotate([90,0])
text(str(down || !v ? "" : "+", v), 3, label_font_sm, halign="center", valign="center");
}
}
// Show an arrow pointing up or down
if (v) {
rotate([0, down ? 180 : 0]) translate([0,0,-1])
cylinder(
r1=0.5,
r2=0.1,
h=arrow_length, $fn=12, center=1
);
}
else
color([1,0,1,0.4]) sphere(r=1.0, $fn=20, center=1);
}
}
}
}
//
// Split a square on the diagonal into
// two triangles and render them.
//
// s : a square
// alt : a flag to split on the other diagonal
//
module tesselated_square(s, alt=false) {
add = [0,0,thickness];
p1 = [
s[0], s[1], s[2], s[3],
s[0]+add, s[1]+add, s[2]+add, s[3]+add
];
f1 = alt
? [ [0,1,3], [4,5,1,0], [4,7,5], [5,7,3,1], [7,4,0,3] ]
: [ [0,1,2], [4,5,1,0], [4,6,5], [5,6,2,1], [6,4,0,2] ];
f2 = alt
? [ [1,2,3], [5,6,2,1], [5,6,7], [6,7,3,2], [7,5,1,3] ]
: [ [0,2,3], [4,6,2,0], [4,7,6], [6,7,3,2], [7,4,0,3] ];
// Use the other diagonal
polyhedron(points=p1, faces=f1);
polyhedron(points=p1, faces=f2);
}
/**
* The simplest mesh display
*/
module simple_mesh(show_plane=show_plane) {
if (show_plane) color(plane_color) cube([mesh_size[X], mesh_size[Y], thickness]);
color(mesh_color)
for (x=[0:mesh_points_x-2], y=[0:mesh_points_y-2])
tesselated_square(grid_square(x, y));
}
/**
* Subdivide the mesh into smaller squares.
*/
module bilinear_mesh(show_plane=show_plane,tesselation=tesselation) {
if (show_plane) color(plane_color) translate([-5,-5]) cube([mesh_size[X]+10, mesh_size[Y]+10, thickness]);
if (some_2D(measured_z, 0)) {
tesselation = tesselation % 4;
color(mesh_color)
for (x=[0:mesh_points_x-2], y=[0:mesh_points_y-2]) {
square = grid_square(x, y);
if (tesselation < 1) {
tesselated_square(square,(x%alternation)-(y%alternation));
}
else {
subdiv_4 = subdivided_square(square);
if (tesselation < 2) {
for (i=[0:3]) tesselated_square(subdiv_4[i],i%alternation);
}
else {
for (i=[0:3]) {
subdiv_16 = subdivided_square(subdiv_4[i]);
if (tesselation < 3) {
for (j=[0:3]) tesselated_square(subdiv_16[j],j%alternation);
}
else {
for (j=[0:3]) {
subdiv_64 = subdivided_square(subdiv_16[j]);
if (tesselation < 4) {
for (k=[0:3]) tesselated_square(subdiv_64[k]);
}
}
}
}
}
}
}
}
}
//
// Subdivision helpers
//
function ctrz(a) = (a[0][2]+a[1][2]+a[3][2]+a[2][2])/4;
function avgx(a,i) = (a[i][0]+a[(i+1)%4][0])/2;
function avgy(a,i) = (a[i][1]+a[(i+1)%4][1])/2;
function avgz(a,i) = (a[i][2]+a[(i+1)%4][2])/2;
//
// Convert one square into 4, applying bilinear averaging
//
// Input : 1 square (4 points)
// Output : An array of 4 squares
//
function subdivided_square(a) = [
[ // SW square
a[0], // SW
[a[0][0],avgy(a,0),avgz(a,0)], // CW
[avgx(a,1),avgy(a,0),ctrz(a)], // CC
[avgx(a,1),a[0][1],avgz(a,3)] // SC
],
[ // NW square
[a[0][0],avgy(a,0),avgz(a,0)], // CW
a[1], // NW
[avgx(a,1),a[1][1],avgz(a,1)], // NC
[avgx(a,1),avgy(a,0),ctrz(a)] // CC
],
[ // NE square
[avgx(a,1),avgy(a,0),ctrz(a)], // CC
[avgx(a,1),a[1][1],avgz(a,1)], // NC
a[2], // NE
[a[2][0],avgy(a,0),avgz(a,2)] // CE
],
[ // SE square
[avgx(a,1),a[0][1],avgz(a,3)], // SC
[avgx(a,1),avgy(a,0),ctrz(a)], // CC
[a[2][0],avgy(a,0),avgz(a,2)], // CE
a[3] // SE
]
];
//================================================ Run the plan
translate([-mesh_size[X] / 2, -mesh_size[Y] / 2]) {
$fn = 12;
point_markers();
bilinear_mesh();
}

View File

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python
from __future__ import print_function
from __future__ import division
""" Generate the stepper delay lookup table for Marlin firmware. """
import argparse
__author__ = "Ben Gamari <bgamari@gmail.com>"
__copyright__ = "Copyright 2012, Ben Gamari"
__license__ = "GPL"
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-f', '--cpu-freq', type=int, default=16, help='CPU clockrate in MHz (default=16)')
parser.add_argument('-d', '--divider', type=int, default=8, help='Timer/counter pre-scale divider (default=8)')
args = parser.parse_args()
cpu_freq = args.cpu_freq * 1000000
timer_freq = cpu_freq / args.divider
print("#ifndef SPEED_LOOKUPTABLE_H")
print("#define SPEED_LOOKUPTABLE_H")
print()
print('#include "MarlinCore.h"')
print()
print("const uint16_t speed_lookuptable_fast[256][2] PROGMEM = {")
a = [ timer_freq / ((i*256)+(args.cpu_freq*2)) for i in range(256) ]
b = [ a[i] - a[i+1] for i in range(255) ]
b.append(b[-1])
for i in range(32):
print(" ", end=' ')
for j in range(8):
print("{%d, %d}," % (a[8*i+j], b[8*i+j]), end=' ')
print()
print("};")
print()
print("const uint16_t speed_lookuptable_slow[256][2] PROGMEM = {")
a = [ timer_freq / ((i*8)+(args.cpu_freq*2)) for i in range(256) ]
b = [ a[i] - a[i+1] for i in range(255) ]
b.append(b[-1])
for i in range(32):
print(" ", end=' ')
for j in range(8):
print("{%d, %d}," % (a[8*i+j], b[8*i+j]), end=' ')
print()
print("};")
print()
print("#endif")

View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python
"""Thermistor Value Lookup Table Generator
Generates lookup to temperature values for use in a microcontroller in C format based on:
https://en.wikipedia.org/wiki/Steinhart-Hart_equation
The main use is for Arduino programs that read data from the circuit board described here:
https://reprap.org/wiki/Temperature_Sensor_v2.0
Usage: python createTemperatureLookupMarlin.py [options]
Options:
-h, --help show this help
--rp=... pull-up resistor
--t1=ttt:rrr low temperature temperature:resistance point (around 25 degC)
--t2=ttt:rrr middle temperature temperature:resistance point (around 150 degC)
--t3=ttt:rrr high temperature temperature:resistance point (around 250 degC)
--num-temps=... the number of temperature points to calculate (default: 36)
"""
from __future__ import print_function
from __future__ import division
from math import *
import sys, getopt
"Constants"
ZERO = 273.15 # zero point of Kelvin scale
VADC = 5 # ADC voltage
VCC = 5 # supply voltage
ARES = pow(2,10) # 10 Bit ADC resolution
VSTEP = VADC / ARES # ADC voltage resolution
TMIN = 0 # lowest temperature in table
TMAX = 350 # highest temperature in table
class Thermistor:
"Class to do the thermistor maths"
def __init__(self, rp, t1, r1, t2, r2, t3, r3):
l1 = log(r1)
l2 = log(r2)
l3 = log(r3)
y1 = 1.0 / (t1 + ZERO) # adjust scale
y2 = 1.0 / (t2 + ZERO)
y3 = 1.0 / (t3 + ZERO)
x = (y2 - y1) / (l2 - l1)
y = (y3 - y1) / (l3 - l1)
c = (y - x) / ((l3 - l2) * (l1 + l2 + l3))
b = x - c * (l1**2 + l2**2 + l1*l2)
a = y1 - (b + l1**2 *c)*l1
if c < 0:
print("//////////////////////////////////////////////////////////////////////////////////////")
print("// WARNING: Negative coefficient 'c'! Something may be wrong with the measurements! //")
print("//////////////////////////////////////////////////////////////////////////////////////")
c = -c
self.c1 = a # Steinhart-Hart coefficients
self.c2 = b
self.c3 = c
self.rp = rp # pull-up resistance
def resol(self, adc):
"Convert ADC reading into a resolution"
res = self.temp(adc)-self.temp(adc+1)
return res
def voltage(self, adc):
"Convert ADC reading into a Voltage"
return adc * VSTEP # convert the 10 bit ADC value to a voltage
def resist(self, adc):
"Convert ADC reading into a resistance in Ohms"
r = self.rp * self.voltage(adc) / (VCC - self.voltage(adc)) # resistance of thermistor
return r
def temp(self, adc):
"Convert ADC reading into a temperature in Celsius"
l = log(self.resist(adc))
Tinv = self.c1 + self.c2*l + self.c3* l**3 # inverse temperature
return (1/Tinv) - ZERO # temperature
def adc(self, temp):
"Convert temperature into a ADC reading"
x = (self.c1 - (1.0 / (temp+ZERO))) / (2*self.c3)
y = sqrt((self.c2 / (3*self.c3))**3 + x**2)
r = exp((y-x)**(1.0/3) - (y+x)**(1.0/3))
return (r / (self.rp + r)) * ARES
def main(argv):
"Default values"
t1 = 25 # low temperature in Kelvin (25 degC)
r1 = 100000 # resistance at low temperature (10 kOhm)
t2 = 150 # middle temperature in Kelvin (150 degC)
r2 = 1641.9 # resistance at middle temperature (1.6 KOhm)
t3 = 250 # high temperature in Kelvin (250 degC)
r3 = 226.15 # resistance at high temperature (226.15 Ohm)
rp = 4700 # pull-up resistor (4.7 kOhm)
num_temps = 36 # number of entries for look-up table
try:
opts, args = getopt.getopt(argv, "h", ["help", "rp=", "t1=", "t2=", "t3=", "num-temps="])
except getopt.GetoptError as err:
print(str(err))
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit()
elif opt == "--rp":
rp = int(arg)
elif opt == "--t1":
arg = arg.split(':')
t1 = float(arg[0])
r1 = float(arg[1])
elif opt == "--t2":
arg = arg.split(':')
t2 = float(arg[0])
r2 = float(arg[1])
elif opt == "--t3":
arg = arg.split(':')
t3 = float(arg[0])
r3 = float(arg[1])
elif opt == "--num-temps":
num_temps = int(arg)
t = Thermistor(rp, t1, r1, t2, r2, t3, r3)
increment = int((ARES - 1) / (num_temps - 1))
step = int((TMIN - TMAX) / (num_temps - 1))
low_bound = t.temp(ARES - 1)
up_bound = t.temp(1)
min_temp = int(TMIN if TMIN > low_bound else low_bound)
max_temp = int(TMAX if TMAX < up_bound else up_bound)
temps = list(range(max_temp, TMIN + step, step))
print("// Thermistor lookup table for Marlin")
print("// ./createTemperatureLookupMarlin.py --rp=%s --t1=%s:%s --t2=%s:%s --t3=%s:%s --num-temps=%s" % (rp, t1, r1, t2, r2, t3, r3, num_temps))
print("// Steinhart-Hart Coefficients: a=%.15g, b=%.15g, c=%.15g " % (t.c1, t.c2, t.c3))
print("// Theoretical limits of thermistor: %.2f to %.2f degC" % (low_bound, up_bound))
print()
print("const short temptable[][2] PROGMEM = {")
for temp in temps:
adc = t.adc(temp)
print(" { OV(%7.2f), %4s }%s // v=%.3f\tr=%.3f\tres=%.3f degC/count" % (adc , temp, \
',' if temp != temps[-1] else ' ', \
t.voltage(adc), \
t.resist( adc), \
t.resol( adc) \
))
print("};")
def usage():
print(__doc__)
if __name__ == "__main__":
main(sys.argv[1:])

View File

@@ -0,0 +1,104 @@
#!/usr/bin/env bash
#
# findMissingTranslations.sh
#
# Locate all language strings needing an update based on English
#
# Usage: findMissingTranslations.sh [language codes]
#
# If no language codes are specified then all languages will be checked
#
langname() {
case "$1" in
an ) echo "Aragonese" ;; bg ) echo "Bulgarian" ;;
ca ) echo "Catalan" ;; cz ) echo "Czech" ;;
da ) echo "Danish" ;; de ) echo "German" ;;
el ) echo "Greek" ;; el_CY ) echo "Greek (Cyprus)" ;;
el_gr) echo "Greek (Greece)" ;; en ) echo "English" ;;
es ) echo "Spanish" ;; eu ) echo "Basque-Euskera" ;;
fi ) echo "Finnish" ;; fr ) echo "French" ;;
fr_na) echo "French (no accent)" ;; gl ) echo "Galician" ;;
hr ) echo "Croatian (Hrvatski)" ;; hu ) echo "Hungarian / Magyar" ;;
it ) echo "Italian" ;; jp_kana) echo "Japanese (Kana)" ;;
ko_KR) echo "Korean" ;; nl ) echo "Dutch" ;;
pl ) echo "Polish" ;; pt ) echo "Portuguese" ;;
pt_br) echo "Portuguese (Brazil)" ;; ro ) echo "Romanian" ;;
ru ) echo "Russian" ;; sk ) echo "Slovak" ;;
sv ) echo "Swedish" ;; tr ) echo "Turkish" ;;
uk ) echo "Ukrainian" ;; vi ) echo "Vietnamese" ;;
zh_CN) echo "Simplified Chinese" ;; zh_TW ) echo "Traditional Chinese" ;;
* ) echo "<unknown>" ;;
esac
}
LANGHOME="Marlin/src/lcd/language"
[ -d $LANGHOME ] && cd $LANGHOME
FILES=$(ls language_*.h | grep -v -E "(_en|_test)\.h" | sed -E 's/language_([^\.]+)\.h/\1/' | tr '\n' ' ')
# Get files matching the given arguments
TEST_LANGS=""
if [[ -n $@ ]]; then
for K in "$@"; do
for F in $FILES; do
[[ $F == $K ]] && TEST_LANGS+="$F "
done
done
[[ -z $TEST_LANGS ]] && { echo "No languages matching $@." ; exit 0 ; }
else
TEST_LANGS=$FILES
fi
echo "Finding all missing strings for $TEST_LANGS..."
WORD_LINES=() # Complete lines for all words (or, grep out of en at the end instead)
ALL_MISSING=() # All missing languages for each missing word
#NEED_WORDS=() # All missing words across all specified languages
WORD_COUNT=0
# Go through all strings in the English language file
# For each word, query all specified languages for the word
# If the word is missing, add its language to the list
for WORD in $(awk '/LSTR/{print $2}' language_en.h); do
# Skip MSG_MARLIN
[[ $WORD == "MSG_MARLIN" ]] && break
((WORD_COUNT++))
# Find all selected languages that lack the string
LANG_MISSING=" "
for LANG in $TEST_LANGS; do
if [[ $(grep -c -E "^ *LSTR +$WORD\b" language_${LANG}.h) -eq 0 ]]; then
INHERIT=$(awk '/using namespace/{print $3}' language_${LANG}.h | sed -E 's/Language_([a-zA-Z_]+)\s*;/\1/')
if [[ -z $INHERIT || $INHERIT == "en" ]]; then
LANG_MISSING+="$LANG "
elif [[ $(grep -c -E "^ *LSTR +$WORD\b" language_${INHERIT}.h) -eq 0 ]]; then
LANG_MISSING+="$LANG "
fi
fi
done
# For each word store all the missing languages
if [[ $LANG_MISSING != " " ]]; then
WORD_LINES+=("$(grep -m 1 -E "$WORD\b" language_en.h)")
ALL_MISSING+=("$LANG_MISSING")
#NEED_WORDS+=($WORD)
fi
done
echo
echo "${#WORD_LINES[@]} out of $WORD_COUNT LCD strings need translation"
for LANG in $TEST_LANGS; do
HED=0 ; IND=0
for WORDLANGS in "${ALL_MISSING[@]}"; do
# If the current word is missing from the current language then print it
if [[ $WORDLANGS =~ " $LANG " ]]; then
[[ $HED == 0 ]] && { echo ; echo "Missing strings for language_$LANG.h ($(langname $LANG)):" ; HED=1 ; }
echo "${WORD_LINES[$IND]}"
fi
((IND++))
done
done

View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python
# This file is for preprocessing G-code and the new G29 Auto bed leveling from Marlin
# It will analyze the first 2 layers and return the maximum size for this part
# Then it will be replaced with g29_keyword = ';MarlinG29Script' with the new G29 LRFB.
# The new file will be created in the same folder.
from __future__ import print_function
# Your G-code file/folder
folder = './'
my_file = 'test.gcode'
# this is the minimum of G1 instructions which should be between 2 different heights
min_g1 = 3
# maximum number of lines to parse, I don't want to parse the complete file
# only the first plane is we are interested in
max_g1 = 100000000
# g29 keyword
g29_keyword = 'g29'
g29_keyword = g29_keyword.upper()
# output filename
output_file = folder + 'g29_' + my_file
# input filename
input_file = folder + my_file
# minimum scan size
min_size = 40
probing_points = 3 # points x points
# other stuff
min_x = 500
min_y = min_x
max_x = -500
max_y = max_x
last_z = 0.001
layer = 0
lines_of_g1 = 0
gcode = []
# return only g1-lines
def has_g1(line):
return line[:2].upper() == "G1"
# find position in g1 (x,y,z)
def find_axis(line, axis):
found = False
number = ""
for char in line:
if found:
if char == ".":
number += char
elif char == "-":
number += char
else:
try:
int(char)
number += char
except ValueError:
break
else:
found = char.upper() == axis.upper()
try:
return float(number)
except ValueError:
return None
# save the min or max-values for each axis
def set_mima(line):
global min_x, max_x, min_y, max_y, last_z
current_x = find_axis(line, 'x')
current_y = find_axis(line, 'y')
if current_x is not None:
min_x = min(current_x, min_x)
max_x = max(current_x, max_x)
if current_y is not None:
min_y = min(current_y, min_y)
max_y = max(current_y, max_y)
return min_x, max_x, min_y, max_y
# find z in the code and return it
def find_z(gcode, start_at_line=0):
for i in range(start_at_line, len(gcode)):
my_z = find_axis(gcode[i], 'Z')
if my_z is not None:
return my_z, i
def z_parse(gcode, start_at_line=0, end_at_line=0):
i = start_at_line
all_z = []
line_between_z = []
z_at_line = []
# last_z = 0
last_i = -1
while len(gcode) > i:
try:
z, i = find_z(gcode, i + 1)
except TypeError:
break
all_z.append(z)
z_at_line.append(i)
temp_line = i - last_i -1
line_between_z.append(i - last_i - 1)
# last_z = z
last_i = i
if 0 < end_at_line <= i or temp_line >= min_g1:
# print('break at line {} at height {}'.format(i, z))
break
line_between_z = line_between_z[1:]
return all_z, line_between_z, z_at_line
# get the lines which should be the first layer
def get_lines(gcode, minimum):
i = 0
all_z, line_between_z, z_at_line = z_parse(gcode, end_at_line=max_g1)
for count in line_between_z:
i += 1
if count > minimum:
# print('layer: {}:{}'.format(z_at_line[i-1], z_at_line[i]))
return z_at_line[i - 1], z_at_line[i]
with open(input_file, 'r') as file:
lines = 0
for line in file:
lines += 1
if lines > 1000:
break
if has_g1(line):
gcode.append(line)
file.close()
start, end = get_lines(gcode, min_g1)
for i in range(start, end):
set_mima(gcode[i])
print('x_min:{} x_max:{}\ny_min:{} y_max:{}'.format(min_x, max_x, min_y, max_y))
# resize min/max - values for minimum scan
if max_x - min_x < min_size:
offset_x = int((min_size - (max_x - min_x)) / 2 + 0.5) # int round up
# print('min_x! with {}'.format(int(max_x - min_x)))
min_x = int(min_x) - offset_x
max_x = int(max_x) + offset_x
if max_y - min_y < min_size:
offset_y = int((min_size - (max_y - min_y)) / 2 + 0.5) # int round up
# print('min_y! with {}'.format(int(max_y - min_y)))
min_y = int(min_y) - offset_y
max_y = int(max_y) + offset_y
new_command = 'G29 L{0} R{1} F{2} B{3} P{4}\n'.format(min_x,
max_x,
min_y,
max_y,
probing_points)
out_file = open(output_file, 'w')
in_file = open(input_file, 'r')
for line in in_file:
if line[:len(g29_keyword)].upper() == g29_keyword:
out_file.write(new_command)
print('write G29')
else:
out_file.write(line)
file.close()
out_file.close()
print('auto G29 finished')

View File

@@ -0,0 +1,15 @@
#!/usr/bin/env python
"""
Extract the builds used in Github CI, so that we can run them locally
"""
import yaml
# Set the yaml file to parse
yaml_file = '.github/workflows/ci-build-tests.yml'
# Parse the yaml file, and load it into a dictionary (github_configuration)
with open(yaml_file) as f:
github_configuration = yaml.safe_load(f)
# Print out the test platforms
print(' '.join(github_configuration['jobs']['test_builds']['strategy']['matrix']['test-platform']))

View File

@@ -0,0 +1,75 @@
#!/usr/bin/env python3
#
# Marlin 3D Printer Firmware
# Copyright (c) 2021 MarlinFirmware [https://github.com/MarlinFirmware/Marlin]
#
# Based on Sprinter and grbl.
# Copyright (c) 2011 Camiel Gubbels / Erik van der Zalm
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Generate Marlin TFT Images from bitmaps/PNG/JPG
import sys, struct
from PIL import Image
def image2bin(image, output_file, transparency):
w, h = image.size[0], image.size[1]
print(f"Converting image with dimensions {w}x{h}...")
if output_file.endswith(('.c', '.cpp')):
is_cpp = True
row_sp, item_sp = (" ", "") if w >= 480 else (" ", " ")
row_end, data_end = "\n", "};\n"
f = open(output_file, 'wt')
f.write("const uint16_t image[%d] = {\n" % (h * w))
else:
is_cpp = False
row_sp, row_end, data_end = b"", b"", b""
f = open(output_file, 'wb')
tcolor, got_tcolor = 0, False
pixs = image.load()
for y in range(h):
f.write(row_sp)
for x in range(w):
R = pixs[x, y][0] >> 3
G = pixs[x, y][1] >> 2
B = pixs[x, y][2] >> 3
rgb = (R << 11) | (G << 5) | B
if transparency:
if not got_tcolor:
got_tcolor = True
tcolor = rgb # First pixel color is transparent
if rgb == tcolor: rgb = 1 # "color 1" is transparent
if is_cpp:
strHex = item_sp + "0x{0:04X},".format(rgb)
f.write(strHex)
else:
f.write(struct.pack("B", (rgb & 0xFF)))
f.write(struct.pack("B", (rgb >> 8) & 0xFF))
f.write(row_end)
f.write(data_end)
f.close()
if len(sys.argv) <= 2:
print("Utility to export a image in Marlin TFT friendly format.")
print("It will dump a raw bin RGB565 image or create a CPP file with an array of 16 bit image pixels.")
print("Usage: gen-tft-image.py INPUT_IMAGE.(png|bmp|jpg) OUTPUT_FILE.(cpp|bin) [--transparency]")
print("Authors: rhapsodyv, thinkyhead")
exit(1)
transparency = len(sys.argv) > 3 and sys.argv[3] == "--transparency"
output_img = sys.argv[2]
img = Image.open(sys.argv[1])
image2bin(img, output_img, transparency)

View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python3
'''
languageExport.py [--single]
Export LCD language strings to CSV files for easier translation.
Use languageImport.py to import CSV into the language files.
Use --single to export all languages to a single CSV file.
'''
import re
from pathlib import Path
from sys import argv
from languageUtil import namebyid
LANGHOME = "Marlin/src/lcd/language"
# Write multiple sheets if true, otherwise write one giant sheet
MULTISHEET = '--single' not in argv[1:]
OUTDIR = 'out-csv'
# Check for the path to the language files
if not Path(LANGHOME).is_dir():
print("Error: Couldn't find the '%s' directory." % LANGHOME)
print("Edit LANGHOME or cd to the root of the repo before running.")
exit(1)
# A limit just for testing
LIMIT = 0
# A dictionary to contain strings for each language.
# Init with 'en' so English will always be first.
language_strings = { 'en': {} }
# A dictionary to contain all distinct LCD string names
names = {}
# Get all "language_*.h" files
langfiles = sorted(list(Path(LANGHOME).glob('language_*.h')))
# Read each language file
for langfile in langfiles:
# Get the language code from the filename
langcode = langfile.name.replace('language_', '').replace('.h', '')
# Skip 'test' and any others that we don't want
if langcode in ['test']: continue
# Open the file
f = open(langfile, 'r', encoding='utf-8')
if not f: continue
# Flags to indicate a wide or tall section
wideflag, tallflag = False, False
# A counter for the number of strings in the file
stringcount = 0
# A dictionary to hold all the strings
strings = { 'narrow': {}, 'wide': {}, 'tall': {} }
# Read each line in the file
for line in f:
# Clean up the line for easier parsing
line = line.split("//")[0].strip()
if line.endswith(';'): line = line[:-1].strip()
# Check for wide or tall sections, assume no complicated nesting
if line.startswith("#endif") or line.startswith("#else"):
wideflag, tallflag = False, False
elif re.match(r'#if.*WIDTH\s*>=?\s*2[01].*', line): wideflag = True
elif re.match(r'#if.*LCD_HEIGHT\s*>=?\s*4.*', line): tallflag = True
# For string-defining lines capture the string data
match = re.match(r'LSTR\s+([A-Z0-9_]+)\s*=\s*(.+)\s*', line)
if match:
# Name and quote-sanitized value
name, value = match.group(1), match.group(2).replace('\\"', '$$$')
# Remove all _UxGT wrappers from the value in a non-greedy way
value = re.sub(r'_UxGT\((".*?")\)', r'\1', value)
# Multi-line strings get one or more bars | for identification
multiline = 0
multimatch = re.match(r'.*MSG_(\d)_LINE\s*\(\s*(.+?)\s*\).*', value)
if multimatch:
multiline = int(multimatch.group(1))
value = '|' + re.sub(r'"\s*,\s*"', '|', multimatch.group(2))
# Wrap inline defines in parentheses
value = re.sub(r' *([A-Z0-9]+_[A-Z0-9_]+) *', r'(\1)', value)
# Remove quotes around strings
value = re.sub(r'"(.*?)"', r'\1', value).replace('$$$', '""')
# Store all unique names as dictionary keys
names[name] = 1
# Store the string as narrow or wide
strings['tall' if tallflag else 'wide' if wideflag else 'narrow'][name] = value
# Increment the string counter
stringcount += 1
# Break for testing
if LIMIT and stringcount >= LIMIT: break
# Close the file
f.close()
# Store the array in the dict
language_strings[langcode] = strings
# Get the language codes from the dictionary
langcodes = list(language_strings.keys())
# Print the array
#print(language_strings)
# Report the total number of unique strings
print("Found %s distinct LCD strings." % len(names))
# Write a single language entry to the CSV file with narrow, wide, and tall strings
def write_csv_lang(f, strings, name):
f.write(',')
if name in strings['narrow']: f.write('"%s"' % strings['narrow'][name])
f.write(',')
if name in strings['wide']: f.write('"%s"' % strings['wide'][name])
f.write(',')
if name in strings['tall']: f.write('"%s"' % strings['tall'][name])
if MULTISHEET:
#
# Export a separate sheet for each language
#
Path.mkdir(Path(OUTDIR), exist_ok=True)
for lang in langcodes:
with open("%s/language_%s.csv" % (OUTDIR, lang), 'w', encoding='utf-8') as f:
lname = lang + ' ' + namebyid(lang)
header = ['name', lname, lname + ' (wide)', lname + ' (tall)']
f.write('"' + '","'.join(header) + '"\n')
for name in names.keys():
f.write('"' + name + '"')
write_csv_lang(f, language_strings[lang], name)
f.write('\n')
else:
#
# Export one large sheet containing all languages
#
with open("languages.csv", 'w', encoding='utf-8') as f:
header = ['name']
for lang in langcodes:
lname = lang + ' ' + namebyid(lang)
header += [lname, lname + ' (wide)', lname + ' (tall)']
f.write('"' + '","'.join(header) + '"\n')
for name in names.keys():
f.write('"' + name + '"')
for lang in langcodes: write_csv_lang(f, language_strings[lang], name)
f.write('\n')

View File

@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""
languageImport.py
Import LCD language strings from a CSV file or Google Sheets
and write Marlin LCD language files based on the data.
Use languageExport.py to export CSV from the language files.
Google Sheets Link:
https://docs.google.com/spreadsheets/d/12yiy-kS84ajKFm7oQIrC4CF8ZWeu9pAR4zrgxH4ruk4/edit#gid=84528699
TODO: Use the defines and comments above the namespace from existing language files.
Get the 'constexpr uint8_t CHARSIZE' from existing language files.
Get the correct 'using namespace' for languages that don't inherit from English.
"""
import sys, re, requests, csv, datetime
#from languageUtil import namebyid
LANGHOME = "Marlin/src/lcd/language"
OUTDIR = 'out-language'
# Get the file path from the command line
FILEPATH = sys.argv[1] if len(sys.argv) > 1 else None
download = FILEPATH == 'download'
if not FILEPATH or download:
SHEETID = "12yiy-kS84ajKFm7oQIrC4CF8ZWeu9pAR4zrgxH4ruk4"
FILEPATH = 'https://docs.google.com/spreadsheet/ccc?key=%s&output=csv' % SHEETID
if FILEPATH.startswith('http'):
response = requests.get(FILEPATH)
assert response.status_code == 200, 'GET failed for %s' % FILEPATH
csvdata = response.content.decode('utf-8')
else:
if not FILEPATH.endswith('.csv'): FILEPATH += '.csv'
with open(FILEPATH, 'r', encoding='utf-8') as f: csvdata = f.read()
if not csvdata:
print("Error: couldn't read CSV data from %s" % FILEPATH)
exit(1)
if download:
DLNAME = sys.argv[2] if len(sys.argv) > 2 else 'languages.csv'
if not DLNAME.endswith('.csv'): DLNAME += '.csv'
with open(DLNAME, 'w', encoding='utf-8') as f: f.write(csvdata)
print("Downloaded %s from %s" % (DLNAME, FILEPATH))
exit(0)
lines = csvdata.splitlines()
print(lines)
reader = csv.reader(lines, delimiter=',')
gothead = False
columns = ['']
numcols = 0
strings_per_lang = {}
for row in reader:
if not gothead:
gothead = True
numcols = len(row)
if row[0] != 'name':
print('Error: first column should be "name"')
exit(1)
# The rest of the columns are language codes and names
for i in range(1, numcols):
elms = row[i].split(' ')
lang = elms[0]
style = ('Wide' if elms[-1] == '(wide)' else 'Tall' if elms[-1] == '(tall)' else 'Narrow')
columns.append({ 'lang': lang, 'style': style })
if not lang in strings_per_lang: strings_per_lang[lang] = {}
if not style in strings_per_lang[lang]: strings_per_lang[lang][style] = {}
continue
# Add the named string for all the included languages
name = row[0]
for i in range(1, numcols):
str_key = row[i]
if str_key:
col = columns[i]
strings_per_lang[col['lang']][col['style']][name] = str_key
# Create a folder for the imported language outfiles
from pathlib import Path
Path.mkdir(Path(OUTDIR), exist_ok=True)
FILEHEADER = '''
/**
* Marlin 3D Printer Firmware
* Copyright (c) 2023 MarlinFirmware [https://github.com/MarlinFirmware/Marlin]
*
* Based on Sprinter and grbl.
* Copyright (c) 2011 Camiel Gubbels / Erik van der Zalm
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#pragma once
/**
* %s
*
* LCD Menu Messages
* See also https://marlinfw.org/docs/development/lcd_language.html
*
* Substitutions are applied for the following characters when used in menu items titles:
*
* $ displays an inserted string
* { displays '0'....'10' for indexes 0 - 10
* ~ displays '1'....'11' for indexes 0 - 10
* * displays 'E1'...'E11' for indexes 0 - 10 (By default. Uses LCD_FIRST_TOOL)
* @ displays an axis name such as XYZUVW, or E for an extruder
*/
'''
# Iterate over the languages which correspond to the columns
# The columns are assumed to be grouped by language in the order Narrow, Wide, Tall
# TODO: Go through lang only, then impose the order Narrow, Wide, Tall.
# So if something is missing or out of order everything still gets built correctly.
f = None
gotlang = {}
for i in range(1, numcols):
#if i > 6: break # Testing
col = columns[i]
lang, style = col['lang'], col['style']
# If we haven't already opened a file for this language, do so now
if not lang in gotlang:
gotlang[lang] = {}
if f: f.close()
fn = "%s/language_%s.h" % (OUTDIR, lang)
f = open(fn, 'w', encoding='utf-8')
if not f:
print("Failed to open %s." % fn)
exit(1)
# Write the opening header for the new language file
#f.write(FILEHEADER % namebyid(lang))
f.write('/**\n * Imported from %s on %s at %s\n */\n' % (FILEPATH, datetime.date.today(), datetime.datetime.now().strftime("%H:%M:%S")))
# Start a namespace for the language and style
f.write('\nnamespace Language%s_%s {\n' % (style, lang))
# Wide and tall namespaces inherit from the others
if style == 'Wide':
f.write(' using namespace LanguageNarrow_%s;\n' % lang)
f.write(' #if LCD_WIDTH >= 20 || HAS_DWIN_E3V2\n')
elif style == 'Tall':
f.write(' using namespace LanguageWide_%s;\n' % lang)
f.write(' #if LCD_HEIGHT >= 4\n')
elif lang != 'en':
f.write(' using namespace Language_en; // Inherit undefined strings from English\n')
# Formatting for the lines
indent = ' ' if style == 'Narrow' else ' '
width = 34 if style == 'Narrow' else 32
lstr_fmt = '%sLSTR %%-%ds = %%s;%%s\n' % (indent, width)
# Emit all the strings for this language and style
for name in strings_per_lang[lang][style].keys():
# Get the raw string value
val = strings_per_lang[lang][style][name]
# Count the number of bars
if val.startswith('|'):
bars = val.count('|')
val = val[1:]
else:
bars = 0
# Escape backslashes, substitute quotes, and wrap in _UxGT("...")
val = '_UxGT("%s")' % val.replace('\\', '\\\\').replace('"', '$$$')
# Move named references outside of the macro
val = re.sub(r'\(([A-Z0-9]+_[A-Z0-9_]+)\)', r'") \1 _UxGT("', val)
# Remove all empty _UxGT("") that result from the above
val = re.sub(r'\s*_UxGT\(""\)\s*', '', val)
# No wrapper needed for just spaces
val = re.sub(r'_UxGT\((" +")\)', r'\1', val)
# Multi-line strings start with a bar...
if bars:
# Wrap the string in MSG_#_LINE(...) and split on bars
val = re.sub(r'^_UxGT\((.+)\)', r'_UxGT(MSG_%s_LINE(\1))' % bars, val)
val = val.replace('|', '", "')
# Restore quotes inside the string
val = val.replace('$$$', '\\"')
# Add a comment with the English string for reference
comm = ''
if lang != 'en' and 'en' in strings_per_lang:
en = strings_per_lang['en']
if name in en[style]: str_key = en[style][name]
elif name in en['Narrow']: str_key = en['Narrow'][name]
if str_key:
cfmt = '%%%ss// %%s' % (50 - len(val) if len(val) < 50 else 1)
comm = cfmt % (' ', str_key)
# Write out the string definition
f.write(lstr_fmt % (name, val, comm))
if style == 'Wide' or style == 'Tall': f.write(' #endif\n')
f.write('}\n') # End namespace
# Assume the 'Tall' namespace comes last
if style == 'Tall': f.write('\nnamespace Language_%s {\n using namespace LanguageTall_%s;\n}\n' % (lang, lang))
# Close the last-opened output file
if f: f.close()

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
#
# languageUtil.py
#
# A dictionary to contain language names
LANGNAME = {
'an': "Aragonese",
'bg': "Bulgarian",
'ca': "Catalan",
'cz': "Czech",
'da': "Danish",
'de': "German",
'el': "Greek", 'el_CY': "Greek (Cyprus)", 'el_gr': "Greek (Greece)",
'en': "English",
'es': "Spanish",
'eu': "Basque-Euskera",
'fi': "Finnish",
'fr': "French", 'fr_na': "French (no accent)",
'gl': "Galician",
'hr': "Croatian (Hrvatski)",
'hu': "Hungarian / Magyar",
'it': "Italian",
'jp_kana': "Japanese (Kana)",
'ko_KR': "Korean",
'nl': "Dutch",
'pl': "Polish",
'pt': "Portuguese", 'pt_br': "Portuguese (Brazil)",
'ro': "Romanian",
'ru': "Russian",
'sk': "Slovak",
'sv': "Swedish",
'tr': "Turkish",
'uk': "Ukrainian",
'vi': "Vietnamese",
'zh_CN': "Simplified Chinese", 'zh_TW': "Traditional Chinese"
}
def namebyid(id):
if id in LANGNAME: return LANGNAME[id]
return '<unknown>'

View File

@@ -0,0 +1,280 @@
#!/usr/bin/env python3
#
# Formatter script for pins_MYPINS.h files
#
# Usage: pinsformat.py [infile] [outfile]
#
# With no parameters convert STDIN to STDOUT
#
import sys, re
do_log = False
def logmsg(msg, line):
if do_log: print(msg, line)
col_comment = 50
# String lpad / rpad
def lpad(astr, fill, c=' '):
if not fill: return astr
need = fill - len(astr)
return astr if need <= 0 else (need * c) + astr
def rpad(astr, fill, c=' '):
if not fill: return astr
need = fill - len(astr)
return astr if need <= 0 else astr + (need * c)
# Concatenate a string, adding a space if necessary
# to avoid merging two words
def concat_with_space(s1, s2):
if not s1.endswith(' ') and not s2.startswith(' '):
s1 += ' '
return s1 + s2
# Pin patterns
mpatt = [ r'-?\d{1,3}', r'P[A-I]\d+', r'P\d_\d+', r'Pin[A-Z]\d\b' ]
mstr = '|'.join(mpatt)
mexpr = [ re.compile(f'^{m}$') for m in mpatt ]
# Corrsponding padding for each pattern
ppad = [ 3, 4, 5, 5 ]
# Match a define line
definePinPatt = re.compile(rf'^\s*(//)?#define\s+[A-Z_][A-Z0-9_]+?_PIN\s+({mstr})\s*(//.*)?$')
def format_pins(argv):
src_file = 'stdin'
dst_file = None
scnt = 0
for arg in argv:
if arg == '-v':
global do_log
do_log = True
elif scnt == 0:
# Get a source file if specified. Default destination is the same file
src_file = dst_file = arg
scnt += 1
elif scnt == 1:
# Get destination file if specified
dst_file = arg
scnt += 1
# No text to process yet
file_text = ''
if src_file == 'stdin':
# If no source file specified read from STDIN
file_text = sys.stdin.read()
else:
# Open and read the file src_file
with open(src_file, 'r') as rf: file_text = rf.read()
if len(file_text) == 0:
print('No text to process')
return
# Read from file or STDIN until it terminates
filtered = process_text(file_text)
if dst_file:
with open(dst_file, 'w') as wf: wf.write(filtered)
else:
print(filtered)
# Find the pin pattern so non-pin defines can be skipped
def get_pin_pattern(txt):
r = ''
m = 0
match_count = [ 0, 0, 0, 0 ]
# Find the most common matching pattern
match_threshold = 5
for line in txt.split('\n'):
r = definePinPatt.match(line)
if r == None: continue
ind = -1
for p in mexpr:
ind += 1
if not p.match(r[2]): continue
match_count[ind] += 1
if match_count[ind] >= match_threshold:
return { 'match': mpatt[ind], 'pad':ppad[ind] }
return None
def process_text(txt):
if len(txt) == 0: return '(no text)'
patt = get_pin_pattern(txt)
if patt == None: return txt
pmatch = patt['match']
pindefPatt = re.compile(rf'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s+({pmatch})\s*(//.*)?$')
noPinPatt = re.compile(r'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s+(-1)\s*(//.*)?$')
skipPatt1 = re.compile(r'^(\s*(//)?#define)\s+(AT90USB|USBCON|(BOARD|DAC|FLASH|HAS|IS|USE)_.+|.+_(ADDRESS|AVAILABLE|BAUDRATE|CLOCK|CONNECTION|DEFAULT|ERROR|EXTRUDERS|FREQ|ITEM|MKS_BASE_VERSION|MODULE|NAME|ONLY|ORIENTATION|PERIOD|RANGE|RATE|READ_RETRIES|SERIAL|SIZE|SPI|STATE|STEP|TIMER|VERSION))\s+(.+)\s*(//.*)?$')
skipPatt2 = re.compile(r'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s+(0x[0-9A-Fa-f]+|\d+|.+[a-z].+)\s*(//.*)?$')
skipPatt3 = re.compile(r'^\s*#e(lse|ndif)\b.*$')
aliasPatt = re.compile(r'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s+([A-Z_][A-Z0-9_()]+)\s*(//.*)?$')
switchPatt = re.compile(r'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s*(//.*)?$')
undefPatt = re.compile(r'^(\s*(//)?#undef)\s+([A-Z_][A-Z0-9_]+)\s*(//.*)?$')
defPatt = re.compile(r'^(\s*(//)?#define)\s+([A-Z_][A-Z0-9_]+)\s+([-_\w]+)\s*(//.*)?$')
condPatt = re.compile(r'^(\s*(//)?#(if|ifn?def|elif)(\s+\S+)*)\s+(//.*)$')
commPatt = re.compile(r'^\s{20,}(//.*)?$')
col_value_lj = col_comment - patt['pad'] - 2
col_value_rj = col_comment - 3
#
# #define SKIP_ME
#
def trySkip1(d):
if skipPatt1.match(d['line']) == None: return False
logmsg("skip:", d['line'])
return True
#
# #define MY_PIN [pin]
#
def tryPindef(d):
line = d['line']
r = pindefPatt.match(line)
if r == None: return False
logmsg("pin:", line)
pinnum = r[4] if r[4][0] == 'P' else lpad(r[4], patt['pad'])
line = f'{r[1]} {r[3]}'
line = concat_with_space(rpad(line, col_value_lj), pinnum)
if r[5]: line = rpad(line, col_comment) + r[5]
d['line'] = line
return True
#
# #define MY_PIN -1
#
def tryNoPin(d):
line = d['line']
r = noPinPatt.match(line)
if r == None: return False
logmsg("pin -1:", line)
line = f'{r[1]} {r[3]}'
line = concat_with_space(rpad(line, col_value_lj), '-1')
if r[5]: line = rpad(line, col_comment) + r[5]
d['line'] = line
return True
#
# #define SKIP_ME_TOO
#
def trySkip2(d):
if skipPatt2.match( d['line']) == None: return False
logmsg("skip:", d['line'])
return True
#
# #else|endif
#
def trySkip3(d):
if skipPatt3.match( d['line']) == None: return False
logmsg("skip:", d['line'])
return True
#
# #define ALIAS OTHER
#
def tryAlias(d):
line = d['line']
r = aliasPatt.match(line)
if r == None: return False
logmsg("alias:", line)
line = f'{r[1]} {r[3]}'
line = concat_with_space(line, lpad(r[4], col_value_rj + 1 - len(line)))
if r[5]: line = concat_with_space(rpad(line, col_comment), r[5])
d['line'] = line
return True
#
# #define SWITCH
#
def trySwitch(d):
line = d['line']
r = switchPatt.match(line)
if r == None: return False
logmsg("switch:", line)
line = f'{r[1]} {r[3]}'
if r[4]: line = concat_with_space(rpad(line, col_comment), r[4])
d['line'] = line
d['check_comment_next'] = True
return True
#
# #define ...
#
def tryDef(d):
line = d['line']
r = defPatt.match(line)
if r == None: return False
logmsg("def:", line)
line = f'{r[1]} {r[3]} '
line = concat_with_space(line, lpad(r[4], col_value_rj + 1 - len(line)))
if r[5]: line = rpad(line, col_comment - 1) + ' ' + r[5]
d['line'] = line
return True
#
# #undef ...
#
def tryUndef(d):
line = d['line']
r = undefPatt.match(line)
if r == None: return False
logmsg("undef:", line)
line = f'{r[1]} {r[3]}'
if r[4]: line = concat_with_space(rpad(line, col_comment), r[4])
d['line'] = line
return True
#
# #if|ifdef|ifndef|elif ...
#
def tryCond(d):
line = d['line']
r = condPatt.match(line)
if r == None: return False
logmsg("cond:", line)
line = concat_with_space(rpad(r[1], col_comment), r[5])
d['line'] = line
d['check_comment_next'] = True
return True
out = ''
wDict = { 'check_comment_next': False }
# Transform each line and add it to the output
for line in txt.split('\n'):
wDict['line'] = line
if wDict['check_comment_next']:
r = commPatt.match(line)
wDict['check_comment_next'] = (r != None)
if wDict['check_comment_next']:
# Comments in column 50
line = rpad('', col_comment) + r[1]
elif trySkip1(wDict): pass #define SKIP_ME
elif tryPindef(wDict): pass #define MY_PIN [pin]
elif tryNoPin(wDict): pass #define MY_PIN -1
elif trySkip2(wDict): pass #define SKIP_ME_TOO
elif trySkip3(wDict): pass #else|endif
elif tryAlias(wDict): pass #define ALIAS OTHER
elif trySwitch(wDict): pass #define SWITCH
elif tryDef(wDict): pass #define ...
elif tryUndef(wDict): pass #undef ...
elif tryCond(wDict): pass #if|ifdef|ifndef|elif ...
out += wDict['line'].rstrip() + '\n'
return re.sub('\n\n$', '\n', re.sub(r'\n\n+', '\n\n', out))
# Python standard startup for command line with arguments
if __name__ == '__main__':
format_pins(sys.argv[1:])

View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
#
# Utility to compress Marlin RGB565 TFT data to RLE16 format.
# Reads the existing Marlin RGB565 cpp file and generates a new file with the additional RLE16 data.
#
# Usage: rle16_compress_cpp_image_data.py INPUT_FILE.cpp OUTPUT_FILE.cpp
#
import sys,struct
import re
def addCompressedData(input_file, output_file):
ofile = open(output_file, 'wt')
c_data_section = False
c_skip_data = False
c_footer = False
raw_data = []
rle_value = []
rle_count = []
arrname = ''
line = input_file.readline()
while line:
if not c_footer:
if not c_skip_data: ofile.write(line)
if "};" in line:
c_skip_data = False
c_data_section = False
c_footer = True
if c_data_section:
cleaned = re.sub(r"\s|,|\n", "", line)
as_list = cleaned.split("0x")
as_list.pop(0)
raw_data += [int(x, 16) for x in as_list]
if "const uint" in line:
# e.g.: const uint16_t marlin_logo_480x320x16[153600] = {
if "_rle16" in line:
c_skip_data = True
else:
c_data_section = True
arrname = line.split('[')[0].split(' ')[-1]
print("Found data array", arrname)
line = input_file.readline()
input_file.close()
#
# RLE16 (run length 16) encoding
# Convert data from from raw RGB565 to a simple run-length-encoded format for each word of data.
# - Each sequence begins with a count byte N.
# - If the high bit is set in N the run contains N & 0x7F + 1 unique words.
# - Otherwise it repeats the following word N + 1 times.
# - Each RGB565 word is stored in MSB / LSB order.
#
def rle_encode(data):
warn = "This may take a while" if len(data) > 300000 else ""
print("Compressing image data...", warn)
rledata = []
distinct = []
i = 0
while i < len(data):
v = data[i]
i += 1
rsize = 1
for j in range(i, len(data)):
if v != data[j]: break
i += 1
rsize += 1
if rsize >= 128: break
# If the run is one, add to the distinct values
if rsize == 1: distinct.append(v)
# If distinct length >= 127, or the repeat run is 2 or more,
# store the distinct run.
nr = len(distinct)
if nr and (nr >= 128 or rsize > 1 or i >= len(data)):
rledata += [(nr - 1) | 0x80] + distinct
distinct = []
# If the repeat run is 2 or more, store the repeat run.
if rsize > 1: rledata += [rsize - 1, v]
return rledata
def append_byte(data, byte, cols=240):
if data == '': data = ' '
data += ('0x{0:02X}, '.format(byte)) # 6 characters
if len(data) % (cols * 6 + 2) == 0: data = data.rstrip() + "\n "
return data
def rle_emit(ofile, arrname, rledata, rawsize):
col = 0
i = 0
outstr = ''
size = 0
while i < len(rledata):
rval = rledata[i]
i += 1
if rval & 0x80:
count = (rval & 0x7F) + 1
outstr = append_byte(outstr, rval)
size += 1
for j in range(count):
outstr = append_byte(outstr, rledata[i + j] >> 8)
outstr = append_byte(outstr, rledata[i + j] & 0xFF)
size += 2
i += count
else:
outstr = append_byte(outstr, rval)
outstr = append_byte(outstr, rledata[i] >> 8)
outstr = append_byte(outstr, rledata[i] & 0xFF)
i += 1
size += 3
outstr = outstr.rstrip()[:-1]
ofile.write("\n// Saves %i bytes\nconst uint8_t %s_rle16[%d] = {\n%s\n};\n" % (rawsize - size, arrname, size, outstr))
(w, h, d) = arrname.split("_")[-1].split('x')
ofile.write("\nconst tImage MarlinLogo{0}x{1}x16 = MARLIN_LOGO_CHOSEN({0}, {1});\n".format(w, h))
ofile.write("\n#endif // HAS_GRAPHICAL_TFT && SHOW_BOOTSCREEN\n".format(w, h))
# Encode the data, write it out, close the file
rledata = rle_encode(raw_data)
rle_emit(ofile, arrname, rledata, len(raw_data) * 2)
ofile.close()
if len(sys.argv) <= 2:
print("Utility to compress Marlin RGB565 TFT data to RLE16 format.")
print("Reads a Marlin RGB565 cpp file and generates a new file with the additional RLE16 data.")
print("Usage: rle16_compress_cpp_image_data.py INPUT_FILE.cpp OUTPUT_FILE.cpp")
exit(1)
output_cpp = sys.argv[2]
inname = sys.argv[1].replace('//', '/')
input_cpp = open(inname)
print("Processing", inname, "...")
addCompressedData(input_cpp, output_cpp)

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
#
# Bitwise RLE compress a Marlin mono DOGM bitmap.
# Input: An existing Marlin Marlin mono DOGM bitmap .cpp or .h file.
# Output: A new file with the original and compressed data.
#
# Usage: rle_compress_bitmap.py INPUT_FILE OUTPUT_FILE
#
import sys,struct
import re
def addCompressedData(input_file, output_file):
ofile = open(output_file, 'wt')
datatype = "uint8_t"
bytewidth = 16
raw_data = []
arrname = ''
c_data_section = False ; c_skip_data = False ; c_footer = False
while True:
line = input_file.readline()
if not line: break
if not c_footer:
if not c_skip_data: ofile.write(line)
mat = re.match(r'.+CUSTOM_BOOTSCREEN_BMPWIDTH\s+(\d+)', line)
if mat: bytewidth = (int(mat[1]) + 7) // 8
if "};" in line:
c_skip_data = False
c_data_section = False
c_footer = True
if c_data_section:
cleaned = re.sub(r"\s|,|\n", "", line)
mat = re.match(r'(0b|B)[01]{8}', cleaned)
if mat:
as_list = cleaned.split(mat[1])
as_list.pop(0)
raw_data += [int(x, 2) for x in as_list]
else:
as_list = cleaned.split("0x")
as_list.pop(0)
raw_data += [int(x, 16) for x in as_list]
mat = re.match(r'const (uint\d+_t|unsigned char)', line)
if mat:
# e.g.: const unsigned char custom_start_bmp[] PROGMEM = {
datatype = mat[0]
if "_rle" in line:
c_skip_data = True
else:
c_data_section = True
arrname = line.split('[')[0].split(' ')[-1]
print("Found data array", arrname)
input_file.close()
#print("\nRaw Bitmap Data", raw_data)
#
# Bitwise RLE (run length) encoding
# Convert data from raw mono bitmap to a bitwise run-length-encoded format.
# - The first nybble is the starting bit state. Changing this nybble inverts the bitmap.
# - The following bytes provide the runs for alternating on/off bits.
# - A value of 0-14 encodes a run of 1-15.
# - A value of 16 indicates a run of 16-270 calculated using the next two bytes.
#
def bitwise_rle_encode(data):
def get_bit(data, n): return 1 if (data[n // 8] & (0x80 >> (n & 7))) else 0
def try_encode(data, isext):
bitslen = len(data) * 8
bitstate = get_bit(data, 0)
rledata = [ bitstate ]
bigrun = 256 if isext else 272
medrun = False
i = 0
runlen = -1
while i <= bitslen:
if i < bitslen: b = get_bit(data, i)
runlen += 1
if bitstate != b or i == bitslen:
if runlen >= bigrun:
isext = True
if medrun: return [], isext
rem = runlen & 0xFF
rledata += [ 15, 15, rem // 16, rem % 16 ]
elif runlen >= 16:
rledata += [ 15, runlen // 16 - 1, runlen % 16 ]
if runlen >= 256: medrun = True
else:
rledata += [ runlen - 1 ]
bitstate ^= 1
runlen = 0
i += 1
#print("\nrledata", rledata)
encoded = []
ri = 0
rlen = len(rledata)
while ri < rlen:
v = rledata[ri] << 4
if (ri < rlen - 1): v |= rledata[ri + 1]
encoded += [ v ]
ri += 2
#print("\nencoded", encoded)
return encoded, isext
# Try to encode with the original isext flag
warn = "This may take a while" if len(data) > 300000 else ""
print("Compressing image data...", warn)
isext = False
encoded, isext = try_encode(data, isext)
if len(encoded) == 0:
encoded, isext = try_encode(data, True)
return encoded, isext
def bitwise_rle_decode(isext, rledata, invert=0):
expanded = []
for n in rledata: expanded += [ n >> 4, n & 0xF ]
decoded = []
bitstate = 0 ; workbyte = 0 ; outindex = 0
i = 0
while i < len(expanded):
c = expanded[i]
i += 1
if i == 1: bitstate = c ; continue
if c == 15:
d = expanded[i] ; e = expanded[i + 1]
if isext and d == 15:
c = 256 + 16 * e + expanded[i + 2] - 1
i += 1
else:
c = 16 * d + e + 15
i += 2
for _ in range(c, -1, -1):
bitval = 0x80 >> (outindex & 7)
if bitstate: workbyte |= bitval
if bitval == 1:
decoded += [ workbyte ]
workbyte = 0
outindex += 1
bitstate ^= 1
print("\nDecoded RLE data:")
pretty = [ '{0:08b}'.format(v) for v in decoded ]
rows = [pretty[i:i+bytewidth] for i in range(0, len(pretty), bytewidth)]
for row in rows: print(f"{''.join(row)}")
return decoded
def rle_emit(ofile, arrname, rledata, rawsize, isext):
outstr = ''
rows = [ rledata[i:i+16] for i in range(0, len(rledata), 16) ]
for i in range(0, len(rows)):
rows[i] = [ '0x{0:02X}'.format(v) for v in rows[i] ]
outstr += f" {', '.join(rows[i])},\n"
outstr = outstr[:-2]
size = len(rledata)
defname = 'COMPACT_CUSTOM_BOOTSCREEN_EXT' if isext else 'COMPACT_CUSTOM_BOOTSCREEN'
ofile.write(f"\n// Saves {rawsize - size} bytes\n#define {defname}\n{datatype} {arrname}_rle[{size}] PROGMEM = {{\n{outstr}\n}};\n")
# Encode the data, write it out, close the file
rledata, isext = bitwise_rle_encode(raw_data)
rle_emit(ofile, arrname, rledata, len(raw_data), isext)
ofile.close()
# Validate that code properly compressed (and decompressed) the data
checkdata = bitwise_rle_decode(isext, rledata)
for i in range(0, len(checkdata)):
if raw_data[i] != checkdata[i]:
print(f'Data mismatch at byte offset {i} (should be {raw_data[i]} but got {checkdata[i]})')
break
if len(sys.argv) <= 2:
print('Usage: rle_compress_bitmap.py INPUT_FILE OUTPUT_FILE')
exit(1)
output_cpp = sys.argv[2]
inname = sys.argv[1].replace('//', '/')
try:
input_cpp = open(inname)
print("Processing", inname, "...")
addCompressedData(input_cpp, output_cpp)
except OSError:
print("Can't find input file", inname)

View File

@@ -0,0 +1,342 @@
import argparse, sys, os, time, random, serial
from SCons.Script import DefaultEnvironment
env = DefaultEnvironment()
import MarlinBinaryProtocol
#-----------------#
# Upload Callback #
#-----------------#
def Upload(source, target, env):
#-------#
# Debug #
#-------#
Debug = False # Set to True to enable script debug
def debugPrint(data):
if Debug: print(f"[Debug]: {data}")
#------------------#
# Marlin functions #
#------------------#
def _GetMarlinEnv(marlinEnv, feature):
if not marlinEnv: return None
return marlinEnv[feature] if feature in marlinEnv else None
#----------------#
# Port functions #
#----------------#
def _GetUploadPort(env):
debugPrint('Autodetecting upload port...')
env.AutodetectUploadPort(env)
portName = env.subst('$UPLOAD_PORT')
if not portName:
raise Exception('Error detecting the upload port.')
debugPrint('OK')
return portName
#-------------------------#
# Simple serial functions #
#-------------------------#
def _OpenPort():
# Open serial port
if port.is_open: return
debugPrint('Opening upload port...')
port.open()
port.reset_input_buffer()
debugPrint('OK')
def _ClosePort():
# Open serial port
if port is None: return
if not port.is_open: return
debugPrint('Closing upload port...')
port.close()
debugPrint('OK')
def _Send(data):
debugPrint(f'>> {data}')
strdata = bytearray(data, 'utf8') + b'\n'
port.write(strdata)
time.sleep(0.010)
def _Recv():
clean_responses = []
responses = port.readlines()
for Resp in responses:
# Suppress invalid chars (coming from debug info)
try:
clean_response = Resp.decode('utf8').rstrip().lstrip()
clean_responses.append(clean_response)
debugPrint(f'<< {clean_response}')
except:
pass
return clean_responses
#------------------#
# SDCard functions #
#------------------#
def _CheckSDCard():
debugPrint('Checking SD card...')
_Send('M21')
Responses = _Recv()
if len(Responses) < 1 or not any('SD card ok' in r for r in Responses):
raise Exception('Error accessing SD card')
debugPrint('SD Card OK')
return True
#----------------#
# File functions #
#----------------#
def _GetFirmwareFiles(UseLongFilenames):
debugPrint('Get firmware files...')
_Send(f"M20 F{'L' if UseLongFilenames else ''}")
Responses = _Recv()
if len(Responses) < 3 or not any('file list' in r for r in Responses):
raise Exception('Error getting firmware files')
debugPrint('OK')
return Responses
def _FilterFirmwareFiles(FirmwareList, UseLongFilenames):
Firmwares = []
for FWFile in FirmwareList:
# For long filenames take the 3rd column of the firmwares list
if UseLongFilenames:
Space = 0
Space = FWFile.find(' ')
if Space >= 0: Space = FWFile.find(' ', Space + 1)
if Space >= 0: FWFile = FWFile[Space + 1:]
if not '/' in FWFile and '.BIN' in FWFile.upper():
Firmwares.append(FWFile[:FWFile.upper().index('.BIN') + 4])
return Firmwares
def _RemoveFirmwareFile(FirmwareFile):
_Send(f'M30 /{FirmwareFile}')
Responses = _Recv()
Removed = len(Responses) >= 1 and any('File deleted' in r for r in Responses)
if not Removed:
raise Exception(f"Firmware file '{FirmwareFile}' not removed")
return Removed
def _RollbackUpload(FirmwareFile):
if not rollback: return
print(f"Rollback: trying to delete firmware '{FirmwareFile}'...")
_OpenPort()
# Wait for SD card release
time.sleep(1)
# Remount SD card
_CheckSDCard()
print(' OK' if _RemoveFirmwareFile(FirmwareFile) else ' Error!')
_ClosePort()
#---------------------#
# Callback Entrypoint #
#---------------------#
port = None
protocol = None
filetransfer = None
rollback = False
# Get Marlin evironment vars
MarlinEnv = env['MARLIN_FEATURES']
marlin_pioenv = _GetMarlinEnv(MarlinEnv, 'PIOENV')
marlin_motherboard = _GetMarlinEnv(MarlinEnv, 'MOTHERBOARD')
marlin_board_info_name = _GetMarlinEnv(MarlinEnv, 'BOARD_INFO_NAME')
marlin_board_custom_build_flags = _GetMarlinEnv(MarlinEnv, 'BOARD_CUSTOM_BUILD_FLAGS')
marlin_firmware_bin = _GetMarlinEnv(MarlinEnv, 'FIRMWARE_BIN')
marlin_long_filename_host_support = _GetMarlinEnv(MarlinEnv, 'LONG_FILENAME_HOST_SUPPORT') is not None
marlin_longname_write = _GetMarlinEnv(MarlinEnv, 'LONG_FILENAME_WRITE_SUPPORT') is not None
marlin_custom_firmware_upload = _GetMarlinEnv(MarlinEnv, 'CUSTOM_FIRMWARE_UPLOAD') is not None
marlin_short_build_version = _GetMarlinEnv(MarlinEnv, 'SHORT_BUILD_VERSION')
marlin_string_config_h_author = _GetMarlinEnv(MarlinEnv, 'STRING_CONFIG_H_AUTHOR')
# Get firmware upload params
upload_firmware_source_path = os.path.join(env["PROJECT_BUILD_DIR"], env["PIOENV"], f"{env['PROGNAME']}.bin") if 'PROGNAME' in env else str(source[0])
# Source firmware filename
upload_speed = env['UPLOAD_SPEED'] if 'UPLOAD_SPEED' in env else 115200
# baud rate of serial connection
upload_port = _GetUploadPort(env) # Serial port to use
# Set local upload params
upload_firmware_target_name = os.path.basename(upload_firmware_source_path)
# Target firmware filename
upload_timeout = 1000 # Communication timout, lossy/slow connections need higher values
upload_blocksize = 512 # Transfer block size. 512 = Autodetect
upload_compression = True # Enable compression
upload_error_ratio = 0 # Simulated corruption ratio
upload_test = False # Benchmark the serial link without storing the file
upload_reset = True # Trigger a soft reset for firmware update after the upload
# Set local upload params based on board type to change script behavior
# "upload_delete_old_bins": delete all *.bin files in the root of SD Card
upload_delete_old_bins = marlin_motherboard in ['BOARD_CREALITY_V4', 'BOARD_CREALITY_V4210', 'BOARD_CREALITY_V422', 'BOARD_CREALITY_V423',
'BOARD_CREALITY_V427', 'BOARD_CREALITY_V431', 'BOARD_CREALITY_V452', 'BOARD_CREALITY_V453',
'BOARD_CREALITY_V24S1']
# "upload_random_name": generate a random 8.3 firmware filename to upload
upload_random_filename = upload_delete_old_bins and not marlin_long_filename_host_support
# Heatshrink module is needed (only) for compression
if upload_compression:
if sys.version_info[0] > 2:
try:
import heatshrink2
except ImportError:
print("Installing 'heatshrink2' python module...")
env.Execute(env.subst("$PYTHONEXE -m pip install heatshrink2"))
else:
try:
import heatshrink
except ImportError:
print("Installing 'heatshrink' python module...")
env.Execute(env.subst("$PYTHONEXE -m pip install heatshrink"))
try:
# Start upload job
print(f"Uploading firmware '{os.path.basename(upload_firmware_target_name)}' to '{marlin_motherboard}' via '{upload_port}'")
# Dump some debug info
if Debug:
print('Upload using:')
print('---- Marlin -----------------------------------')
print(f' PIOENV : {marlin_pioenv}')
print(f' SHORT_BUILD_VERSION : {marlin_short_build_version}')
print(f' STRING_CONFIG_H_AUTHOR : {marlin_string_config_h_author}')
print(f' MOTHERBOARD : {marlin_motherboard}')
print(f' BOARD_INFO_NAME : {marlin_board_info_name}')
print(f' CUSTOM_BUILD_FLAGS : {marlin_board_custom_build_flags}')
print(f' FIRMWARE_BIN : {marlin_firmware_bin}')
print(f' LONG_FILENAME_HOST_SUPPORT : {marlin_long_filename_host_support}')
print(f' LONG_FILENAME_WRITE_SUPPORT : {marlin_longname_write}')
print(f' CUSTOM_FIRMWARE_UPLOAD : {marlin_custom_firmware_upload}')
print('---- Upload parameters ------------------------')
print(f' Source : {upload_firmware_source_path}')
print(f' Target : {upload_firmware_target_name}')
print(f' Port : {upload_port} @ {upload_speed} baudrate')
print(f' Timeout : {upload_timeout}')
print(f' Block size : {upload_blocksize}')
print(f' Compression : {upload_compression}')
print(f' Error ratio : {upload_error_ratio}')
print(f' Test : {upload_test}')
print(f' Reset : {upload_reset}')
print('-----------------------------------------------')
# Custom implementations based on board parameters
# Generate a new 8.3 random filename
if upload_random_filename:
upload_firmware_target_name = f"fw-{''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789', k=5))}.BIN"
print(f"Board {marlin_motherboard}: Overriding firmware filename to '{upload_firmware_target_name}'")
# Delete all *.bin files on the root of SD Card (if flagged)
if upload_delete_old_bins:
# CUSTOM_FIRMWARE_UPLOAD is needed for this feature
if not marlin_custom_firmware_upload:
raise Exception(f"CUSTOM_FIRMWARE_UPLOAD must be enabled in 'Configuration_adv.h' for '{marlin_motherboard}'")
# Init & Open serial port
port = serial.Serial(upload_port, baudrate = upload_speed, write_timeout = 0, timeout = 0.1)
_OpenPort()
# Check SD card status
_CheckSDCard()
# Get firmware files
FirmwareFiles = _GetFirmwareFiles(marlin_long_filename_host_support)
if Debug:
for FirmwareFile in FirmwareFiles:
print(f'Found: {FirmwareFile}')
# Get all 1st level firmware files (to remove)
OldFirmwareFiles = _FilterFirmwareFiles(FirmwareFiles[1:len(FirmwareFiles)-2], marlin_long_filename_host_support) # Skip header and footers of list
if len(OldFirmwareFiles) == 0:
print('No old firmware files to delete')
else:
print(f"Remove {len(OldFirmwareFiles)} old firmware file{'s' if len(OldFirmwareFiles) != 1 else ''}:")
for OldFirmwareFile in OldFirmwareFiles:
print(f" -Removing- '{OldFirmwareFile}'...")
print(' OK' if _RemoveFirmwareFile(OldFirmwareFile) else ' Error!')
# Close serial
_ClosePort()
# Cleanup completed
debugPrint('Cleanup completed')
# WARNING! The serial port must be closed here because the serial transfer that follow needs it!
# Upload firmware file
debugPrint(f"Copy '{upload_firmware_source_path}' --> '{upload_firmware_target_name}'")
protocol = MarlinBinaryProtocol.Protocol(upload_port, upload_speed, upload_blocksize, float(upload_error_ratio), int(upload_timeout))
#echologger = MarlinBinaryProtocol.EchoProtocol(protocol)
protocol.connect()
# Mark the rollback (delete broken transfer) from this point on
rollback = True
filetransfer = MarlinBinaryProtocol.FileTransferProtocol(protocol)
transferOK = filetransfer.copy(upload_firmware_source_path, upload_firmware_target_name, upload_compression, upload_test)
protocol.disconnect()
# Notify upload completed
protocol.send_ascii('M117 Firmware uploaded' if transferOK else 'M117 Firmware upload failed')
# Remount SD card
print('Wait for SD card release...')
time.sleep(1)
print('Remount SD card')
protocol.send_ascii('M21')
# Transfer failed?
if not transferOK:
protocol.shutdown()
_RollbackUpload(upload_firmware_target_name)
else:
# Trigger firmware update
if upload_reset:
print('Trigger firmware update...')
protocol.send_ascii('M997', True)
protocol.shutdown()
print('Firmware update completed' if transferOK else 'Firmware update failed')
return 0 if transferOK else -1
except KeyboardInterrupt:
print('Aborted by user')
if filetransfer: filetransfer.abort()
if protocol:
protocol.disconnect()
protocol.shutdown()
_RollbackUpload(upload_firmware_target_name)
_ClosePort()
raise
except serial.SerialException as se:
# This exception is raised only for send_ascii data (not for binary transfer)
print(f'Serial excepion: {se}, transfer aborted')
if protocol:
protocol.disconnect()
protocol.shutdown()
_RollbackUpload(upload_firmware_target_name)
_ClosePort()
raise Exception(se)
except MarlinBinaryProtocol.FatalError:
print('Too many retries, transfer aborted')
if protocol:
protocol.disconnect()
protocol.shutdown()
_RollbackUpload(upload_firmware_target_name)
_ClosePort()
raise
except Exception as ex:
print(f"\nException: {ex}, transfer aborted")
if protocol:
protocol.disconnect()
protocol.shutdown()
_RollbackUpload(upload_firmware_target_name)
_ClosePort()
print('Firmware not updated')
raise
# Attach custom upload callback
env.Replace(UPLOADCMD=Upload)