#!/usr/bin/python3 # -*- coding: utf-8 -*- import re import os import struct import jwutils from jwutils.log import * HPC = 255 SPT = 63 BPS = 512 class CmdMem2Disk(jwutils.Cmd): class Chunk: def __init__(self, size, offset=None, name=None): self.name = name self.size = size self.offset = offset def __str__(self): return "n:{}, s:{}, o:{}".format(self.name, self.size, self.offset) class Part(Chunk): def __init__(self, rhs): super().__init__(rhs.size, rhs.offset, rhs.name) if isinstance(rhs, self.__class__): self.addr = rhs.addr self.aligned_size = rhs.aligned_size else: self.addr = None self.aligned_size = CmdMem2Disk.align_to_block(self.size) def __str__(self): return super().__str__() + ", a:{}, S:{}".format(self.addr, self.aligned_size) def __to_int(self, s): if isinstance(s, int): return s if not s or not len(s): return None if s.find("0x") != -1: return int(s, 16) return int(s) def __parse_mtdparts(self, spec, total_size=None): #flash:0x80000@0x50000000(u-boot),0x800000@0x2000000(fb),0x1600000(rootfs),0x180000(data) rx_partdef = re.compile("((0x)*[0-9a-f]+|[0-9]+|-)(@((0x)*[0-9a-f]+|[0-9]+))*(\(([a-zA-Z_-]+)\))*") r = [] rest = total_size mtdparts = spec.split(';') count = 1 for mtddef_spec in mtdparts: mtddef = mtddef_spec.split(':') mtdid = mtddef[0] for partdef_spec in mtddef[1].split(','): match = rx_partdef.search(partdef_spec) if not match: raise Exception("Unparseable part definition >{}<".format(partdef_spec)) for i in range(0, len(match.groups())): slog(DEBUG, "match", i, "=", match.group(i)) size = match.group(1) offset = match.group(4) name = match.group(7) slog(DEBUG, "size={}, offset={}, name={}".format(size, offset, name)) if not name or not len(name): name = str(count) if rest and rest <= 0: raise Exception("Partition \"{}\" starts after end of input".format(name)) if size == '-': if rest is None: raise Exception("Can't calculate length of partition \"{}\" with total space undefined".format(name)) size = rest else: size = self.__to_int(size) offset = self.__to_int(offset) r.append(self.Chunk(size, offset, name)) if rest: rest -= size count += 1 return r def __lba(self, c, h, s): return (c * HPC + h) * SPT + (s - 1) def __chs(self, addr): lba = addr / BPS c = int(lba / (HPC * SPT)) h = int(lba / SPT) % HPC s = int(lba % SPT) + 1 return (c, h, s) def __chs_bytes(self, addr): (c, h, s) = self.__chs(addr) c_byte = c & 0xFF s_byte = ((c >> 2) & 0xC0) + s slog(DEBUG, "chs=(%d,%d,%d)" % (c, h, s)) slog(DEBUG, "chs-bytes=(%d,%d,%d)" % (c_byte, h, s_byte)) #return (0, 0, 0) return (c_byte, h, s_byte) def __init__(self): super().__init__(name='mem2disk', help='Create partitioned disk image from raw memory dump') @classmethod def align_to_block(cls, n): if n % BPS == 0: return n return BPS * (int(n/BPS)+1) # C in [0,1023], H in [0,255], S in [1,63] # C/H/S # 0/0/1 MBR # 0/1/1 BR1 # 0/1/2 Data1 # X/0/1 BRN # X/0/2 Data1 def align_to_cylinder(self, addr, reserve_boot_sector = True): lowest = self.__lba(0, 1, 1) * BPS if addr < lowest: addr = lowest elif addr != lowest: (c, h, s) = self.__chs(addr) lba = self.__lba(c, h, s) * BPS if lba != addr: assert lba < addr addr = self.__lba(c + 1, 0, 1) * BPS if reserve_boot_sector: addr += BPS return addr def append_part(self, chunk, cur): if cur is None: cur = bytearray() start_ = len(cur) start = self.align_to_cylinder(start_, True) cur.extend(bytearray(start - start_)) cur[start-BPS, start] = create_part_header(start, len(chunk)) cur.extend(chunk) return cur def create_part_table_entry(self, part): slog(DEBUG, "--- creating table entry >{}<".format(part)) start_sec = int(part.addr / BPS + .5) size_sec = int(part.aligned_size / BPS + .5) assert start_sec == part.addr / BPS slog(DEBUG, "size_sec = ", size_sec, "part.aligned_size = ", part.aligned_size, "part.aligned_size / BPS = ", part.aligned_size / BPS) assert size_sec == part.aligned_size / BPS chs_start = self.__chs_bytes(part.addr) chs_end = self.__chs_bytes(part.addr + part.aligned_size - 1) slog(DEBUG, "chs_start=", chs_start) slog(DEBUG, "chs_end=", chs_end) return struct.pack("= 0 if not left: return n def copy(self, fin, fout, n): max_bufsize = 1024 * 1024 left = n slog(DEBUG, "== copying ", n, "to output file offset", fout.tell()) while left: slog(DEBUG, "left={}".format(left)) bufsize = left if left <= max_bufsize else max_bufsize data = fin.read(bufsize) if not data or not len(data): raise Exception("Failed to read %d bytes from file" % bufsize) #slog(DEBUG, " data = {}".format(data[0:15])) self.write(fout, data) left -= len(data) assert left >= 0 return n async def run(self, args): def empty(size): return bytearray([self.__to_int(args.pad_byte)]) * size if args.mtdparts: if args.input in [ '/dev/zero' ]: total_size = None else: total_size = os.stat(args.input).st_size chunks = self.__parse_mtdparts(args.mtdparts, total_size=total_size) # else if args.xxx: possible other partition layout definition formats go here else: raise Exception("No memory layout definition given") pos = 0 parts = [] for chunk in chunks: part = self.Part(chunk) part.addr = self.align_to_cylinder(pos, False) parts.append(part) pos = part.addr + part.aligned_size + BPS pos = 0 with open(args.output, mode='wb') as fout: with open(args.input, mode='rb') as fin: pos += self.write(fout, self.create_mbr(parts)) for part in parts: if pos < part.addr: pos += self.write(fout, empty(part.addr - pos)) if not args.no_boot_sectors: pos += self.write(fout, empty(BPS)) # empty boot sector pos += self.copy(fin, fout, part.size) pad = part.aligned_size - part.size assert pad >= 0 if pad > 0: pos += self.write(fout, empty(pad)) return 0 exit(jwutils.run_sub_commands('File utilities') != 0)