mirror of
ssh://git.janware.com/srv/git/janware/proj/jw-devtest
synced 2026-01-15 02:22:56 +01:00
248 lines
8.9 KiB
Python
248 lines
8.9 KiB
Python
|
|
#!/usr/bin/python3
|
||
|
|
# -*- coding: utf-8 -*-
|
||
|
|
|
||
|
|
import re
|
||
|
|
import os
|
||
|
|
import struct
|
||
|
|
import jwutils
|
||
|
|
from jwutils.log import *
|
||
|
|
|
||
|
|
HPC = 255
|
||
|
|
SPT = 63
|
||
|
|
BPS = 512
|
||
|
|
|
||
|
|
class CmdMem2Disk(jwutils.Cmd):
|
||
|
|
|
||
|
|
class Chunk:
|
||
|
|
def __init__(self, size, offset=None, name=None):
|
||
|
|
self.name = name
|
||
|
|
self.size = size
|
||
|
|
self.offset = offset
|
||
|
|
def __str__(self):
|
||
|
|
return "n:{}, s:{}, o:{}".format(self.name, self.size, self.offset)
|
||
|
|
|
||
|
|
class Part(Chunk):
|
||
|
|
def __init__(self, rhs):
|
||
|
|
super().__init__(rhs.size, rhs.offset, rhs.name)
|
||
|
|
if isinstance(rhs, self.__class__):
|
||
|
|
self.addr = rhs.addr
|
||
|
|
self.aligned_size = rhs.aligned_size
|
||
|
|
else:
|
||
|
|
self.addr = None
|
||
|
|
self.aligned_size = CmdMem2Disk.align_to_block(self.size)
|
||
|
|
|
||
|
|
def __str__(self):
|
||
|
|
return super().__str__() + ", a:{}, S:{}".format(self.addr, self.aligned_size)
|
||
|
|
|
||
|
|
def __to_int(self, s):
|
||
|
|
if isinstance(s, int):
|
||
|
|
return s
|
||
|
|
if not s or not len(s):
|
||
|
|
return None
|
||
|
|
if s.find("0x") != -1:
|
||
|
|
return int(s, 16)
|
||
|
|
return int(s)
|
||
|
|
|
||
|
|
def __parse_mtdparts(self, spec, total_size=None):
|
||
|
|
#flash:0x80000@0x50000000(u-boot),0x800000@0x2000000(fb),0x1600000(rootfs),0x180000(data)
|
||
|
|
rx_partdef = re.compile("((0x)*[0-9a-f]+|[0-9]+|-)(@((0x)*[0-9a-f]+|[0-9]+))*(\(([a-zA-Z_-]+)\))*")
|
||
|
|
r = []
|
||
|
|
rest = total_size
|
||
|
|
mtdparts = spec.split(';')
|
||
|
|
count = 1
|
||
|
|
for mtddef_spec in mtdparts:
|
||
|
|
mtddef = mtddef_spec.split(':')
|
||
|
|
mtdid = mtddef[0]
|
||
|
|
for partdef_spec in mtddef[1].split(','):
|
||
|
|
match = rx_partdef.search(partdef_spec)
|
||
|
|
if not match:
|
||
|
|
raise Exception("Unparseable part definition >{}<".format(partdef_spec))
|
||
|
|
for i in range(0, len(match.groups())):
|
||
|
|
slog(DEBUG, "match", i, "=", match.group(i))
|
||
|
|
size = match.group(1)
|
||
|
|
offset = match.group(4)
|
||
|
|
name = match.group(7)
|
||
|
|
slog(DEBUG, "size={}, offset={}, name={}".format(size, offset, name))
|
||
|
|
if not name or not len(name):
|
||
|
|
name = str(count)
|
||
|
|
if rest and rest <= 0:
|
||
|
|
raise Exception("Partition \"{}\" starts after end of input".format(name))
|
||
|
|
if size == '-':
|
||
|
|
if rest is None:
|
||
|
|
raise Exception("Can't calculate length of partition \"{}\" with total space undefined".format(name))
|
||
|
|
size = rest
|
||
|
|
else:
|
||
|
|
size = self.__to_int(size)
|
||
|
|
offset = self.__to_int(offset)
|
||
|
|
r.append(self.Chunk(size, offset, name))
|
||
|
|
if rest:
|
||
|
|
rest -= size
|
||
|
|
count += 1
|
||
|
|
return r
|
||
|
|
|
||
|
|
def __lba(self, c, h, s):
|
||
|
|
return (c * HPC + h) * SPT + (s - 1)
|
||
|
|
|
||
|
|
def __chs(self, addr):
|
||
|
|
lba = addr / BPS
|
||
|
|
c = int(lba / (HPC * SPT))
|
||
|
|
h = int(lba / SPT) % HPC
|
||
|
|
s = int(lba % SPT) + 1
|
||
|
|
return (c, h, s)
|
||
|
|
|
||
|
|
def __chs_bytes(self, addr):
|
||
|
|
(c, h, s) = self.__chs(addr)
|
||
|
|
c_byte = c & 0xFF
|
||
|
|
s_byte = ((c >> 2) & 0xC0) + s
|
||
|
|
slog(DEBUG, "chs=(%d,%d,%d)" % (c, h, s))
|
||
|
|
slog(DEBUG, "chs-bytes=(%d,%d,%d)" % (c_byte, h, s_byte))
|
||
|
|
#return (0, 0, 0)
|
||
|
|
return (c_byte, h, s_byte)
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
super().__init__(name='mem2disk', help='Create partitioned disk image from raw memory dump')
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def align_to_block(cls, n):
|
||
|
|
if n % BPS == 0:
|
||
|
|
return n
|
||
|
|
return BPS * (int(n/BPS)+1)
|
||
|
|
|
||
|
|
# C in [0,1023], H in [0,255], S in [1,63]
|
||
|
|
# C/H/S
|
||
|
|
# 0/0/1 MBR
|
||
|
|
# 0/1/1 BR1
|
||
|
|
# 0/1/2 Data1
|
||
|
|
# X/0/1 BRN
|
||
|
|
# X/0/2 Data1
|
||
|
|
def align_to_cylinder(self, addr, reserve_boot_sector = True):
|
||
|
|
lowest = self.__lba(0, 1, 1) * BPS
|
||
|
|
if addr < lowest:
|
||
|
|
addr = lowest
|
||
|
|
elif addr != lowest:
|
||
|
|
(c, h, s) = self.__chs(addr)
|
||
|
|
lba = self.__lba(c, h, s) * BPS
|
||
|
|
if lba != addr:
|
||
|
|
assert lba < addr
|
||
|
|
addr = self.__lba(c + 1, 0, 1) * BPS
|
||
|
|
if reserve_boot_sector:
|
||
|
|
addr += BPS
|
||
|
|
return addr
|
||
|
|
|
||
|
|
def append_part(self, chunk, cur):
|
||
|
|
if cur is None:
|
||
|
|
cur = bytearray()
|
||
|
|
start_ = len(cur)
|
||
|
|
start = self.align_to_cylinder(start_, True)
|
||
|
|
cur.extend(bytearray(start - start_))
|
||
|
|
cur[start-BPS, start] = create_part_header(start, len(chunk))
|
||
|
|
cur.extend(chunk)
|
||
|
|
return cur
|
||
|
|
|
||
|
|
def create_part_table_entry(self, part):
|
||
|
|
slog(DEBUG, "--- creating table entry >{}<".format(part))
|
||
|
|
start_sec = int(part.addr / BPS + .5)
|
||
|
|
size_sec = int(part.aligned_size / BPS + .5)
|
||
|
|
assert start_sec == part.addr / BPS
|
||
|
|
slog(DEBUG, "size_sec = ", size_sec, "part.aligned_size = ", part.aligned_size, "part.aligned_size / BPS = ", part.aligned_size / BPS)
|
||
|
|
assert size_sec == part.aligned_size / BPS
|
||
|
|
chs_start = self.__chs_bytes(part.addr)
|
||
|
|
chs_end = self.__chs_bytes(part.addr + part.aligned_size - 1)
|
||
|
|
slog(DEBUG, "chs_start=", chs_start)
|
||
|
|
slog(DEBUG, "chs_end=", chs_end)
|
||
|
|
return struct.pack("<BBBBBBBBLL",
|
||
|
|
0x0, # not bootable
|
||
|
|
chs_start[1], chs_start[2], chs_start[0],
|
||
|
|
0x83, # fstype
|
||
|
|
chs_end[1], chs_end[2], chs_end[0],
|
||
|
|
start_sec, size_sec)
|
||
|
|
#1, 2)
|
||
|
|
|
||
|
|
def create_mbr(self, parts):
|
||
|
|
mbr = bytearray(446)
|
||
|
|
for i in range(0, 4):
|
||
|
|
mbr.extend(self.create_part_table_entry(parts[i]) if i < len(parts) else bytes(16))
|
||
|
|
for magic in [0x55, 0xaa]:
|
||
|
|
mbr.extend(struct.pack('<B', magic))
|
||
|
|
return mbr
|
||
|
|
|
||
|
|
def add_parser(self, parsers):
|
||
|
|
p = super().add_parser(parsers)
|
||
|
|
p.add_argument("--mtdparts", help="Partitioning definition of the memory conforming to mtdparts-syntax")
|
||
|
|
p.add_argument("--no-boot-sectors", help="Don't add (empty) boot sectors at the beginning of each partition",
|
||
|
|
action='store_true', default=False)
|
||
|
|
p.add_argument("--pad-byte", help="Byte value to pad empty areas with", type=int, default=0x0)
|
||
|
|
p.add_argument("input", help="Path to input memory chunk")
|
||
|
|
p.add_argument("output", help="Path to output disk image file")
|
||
|
|
|
||
|
|
def write(self, f, data):
|
||
|
|
n = len(data)
|
||
|
|
if not n:
|
||
|
|
return 0
|
||
|
|
left = n
|
||
|
|
while True:
|
||
|
|
rr = f.write(data)
|
||
|
|
if not rr:
|
||
|
|
raise Exception("Failed to write data to file")
|
||
|
|
left -= rr
|
||
|
|
assert left >= 0
|
||
|
|
if not left:
|
||
|
|
return n
|
||
|
|
|
||
|
|
def copy(self, fin, fout, n):
|
||
|
|
max_bufsize = 1024 * 1024
|
||
|
|
left = n
|
||
|
|
slog(DEBUG, "== copying ", n, "to output file offset", fout.tell())
|
||
|
|
while left:
|
||
|
|
slog(DEBUG, "left={}".format(left))
|
||
|
|
bufsize = left if left <= max_bufsize else max_bufsize
|
||
|
|
data = fin.read(bufsize)
|
||
|
|
if not data or not len(data):
|
||
|
|
raise Exception("Failed to read %d bytes from file" % bufsize)
|
||
|
|
#slog(DEBUG, " data = {}".format(data[0:15]))
|
||
|
|
self.write(fout, data)
|
||
|
|
left -= len(data)
|
||
|
|
assert left >= 0
|
||
|
|
return n
|
||
|
|
|
||
|
|
async def run(self, args):
|
||
|
|
|
||
|
|
def empty(size):
|
||
|
|
return bytearray([self.__to_int(args.pad_byte)]) * size
|
||
|
|
|
||
|
|
if args.mtdparts:
|
||
|
|
if args.input in [ '/dev/zero' ]:
|
||
|
|
total_size = None
|
||
|
|
else:
|
||
|
|
total_size = os.stat(args.input).st_size
|
||
|
|
chunks = self.__parse_mtdparts(args.mtdparts, total_size=total_size)
|
||
|
|
# else if args.xxx: possible other partition layout definition formats go here
|
||
|
|
else:
|
||
|
|
raise Exception("No memory layout definition given")
|
||
|
|
|
||
|
|
pos = 0
|
||
|
|
parts = []
|
||
|
|
for chunk in chunks:
|
||
|
|
part = self.Part(chunk)
|
||
|
|
part.addr = self.align_to_cylinder(pos, False)
|
||
|
|
parts.append(part)
|
||
|
|
pos = part.addr + part.aligned_size + BPS
|
||
|
|
|
||
|
|
pos = 0
|
||
|
|
with open(args.output, mode='wb') as fout:
|
||
|
|
with open(args.input, mode='rb') as fin:
|
||
|
|
pos += self.write(fout, self.create_mbr(parts))
|
||
|
|
for part in parts:
|
||
|
|
if pos < part.addr:
|
||
|
|
pos += self.write(fout, empty(part.addr - pos))
|
||
|
|
if not args.no_boot_sectors:
|
||
|
|
pos += self.write(fout, empty(BPS)) # empty boot sector
|
||
|
|
pos += self.copy(fin, fout, part.size)
|
||
|
|
pad = part.aligned_size - part.size
|
||
|
|
assert pad >= 0
|
||
|
|
if pad > 0:
|
||
|
|
pos += self.write(fout, empty(pad))
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
exit(jwutils.run_sub_commands('File utilities') != 0)
|