312 lines
9.0 KiB
Python
312 lines
9.0 KiB
Python
from twisted.internet.protocol import DatagramProtocol
|
|
from twisted.internet import reactor, task
|
|
from twisted.internet.task import deferLater
|
|
|
|
import os
|
|
import time
|
|
import struct
|
|
import sys
|
|
import hashlib
|
|
import operator
|
|
import functools
|
|
import random
|
|
|
|
class Transporter(DatagramProtocol):
|
|
|
|
def __init__(self, jobs):
|
|
self.udp_transport = reactor.listenUDP(9947, self)
|
|
self.pending = {}
|
|
self.seq = 0
|
|
self.jobs = jobs
|
|
self.firstjob()
|
|
task.LoopingCall(self.earliest).start(0.1)
|
|
reactor.run()
|
|
|
|
def firstjob(self):
|
|
self.jobs[0].startwork(self)
|
|
|
|
def propose(self, cmd, rest):
|
|
seq = self.seq
|
|
self.seq += 1
|
|
data = struct.pack(">HH", seq, cmd) + rest;
|
|
self.pending[seq] = (time.time(), data)
|
|
return seq
|
|
|
|
def earliest(self):
|
|
bytime = [(t, k) for (k, (t, _)) in self.pending.items()]
|
|
for (t, seq) in sorted(bytime)[:32]:
|
|
self.send(seq)
|
|
self.pending[seq] = (time.time(), self.pending[seq][1])
|
|
|
|
def datagramReceived(self, data, (host, port)):
|
|
# print "received %r from %s:%d" % (data, host, port)
|
|
(opcode, seq) = struct.unpack(">HH", data[:4])
|
|
assert opcode == 0
|
|
if seq in self.pending:
|
|
del self.pending[seq]
|
|
try:
|
|
self.jobs[0].addresult(self, seq, data[4:])
|
|
except AssertionError as e:
|
|
print 'assertion failed', e
|
|
reactor.stop()
|
|
return
|
|
print "ACK ", seq, "pending", len(self.pending)
|
|
if len(self.pending) == 0:
|
|
self.jobs[0].close()
|
|
self.jobs = self.jobs[1:]
|
|
if self.jobs != []:
|
|
self.firstjob()
|
|
else:
|
|
reactor.stop()
|
|
# self.transport.write(data, (host, port))
|
|
|
|
def send(self, seq):
|
|
(_, data) = self.pending[seq]
|
|
# print "send %r" % data
|
|
self.udp_transport.write(data, ("192.168.0.99", 947))
|
|
|
|
def addresult(self, seq, payload):
|
|
pass
|
|
|
|
|
|
class Action(object):
|
|
def addresult(self, tr, seq, payload):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
class ReadRAM(Action):
|
|
|
|
def startwork(self, tr):
|
|
self.result = 16384 * [None]
|
|
self.seqs = {}
|
|
for i in range(0, 128):
|
|
self.seqs[tr.propose(0, struct.pack(">H", i * 128))] = i * 128
|
|
|
|
def addresult(self, tr, seq, payload):
|
|
addr = self.seqs[seq]
|
|
assert len(payload) == 128
|
|
for i in range(128):
|
|
self.result[addr + i] = ord(payload[i])
|
|
|
|
def close(self):
|
|
for a in range(0, 16384, 16):
|
|
print ("%04x " % a) + " ".join("%02x" % x for x in self.result[a:a+16])
|
|
|
|
|
|
class WriteRAM(Action):
|
|
|
|
def startwork(self, tr):
|
|
code = open('j1.bin').read()
|
|
for i in range(0x1f80 / 128):
|
|
print i
|
|
o = 128 * i
|
|
tr.propose(1, struct.pack(">H128s", 0x2000 + o, code[o:o+128]))
|
|
|
|
class VerifyRAM(ReadRAM):
|
|
def close(self):
|
|
actual = "".join([chr(c) for c in self.result[0x2000:]])
|
|
expected = open('j1.bin').read()
|
|
l = 0x1f80
|
|
assert actual[:l] == expected[:l]
|
|
|
|
class Reboot(Action):
|
|
def startwork(self, tr):
|
|
tr.propose(2, "")
|
|
|
|
class ReadFlash(Action):
|
|
|
|
def startwork(self, tr):
|
|
self.result = 2 * 1024 * 1024 * [None]
|
|
self.seqs = {}
|
|
for addr in range(0, len(self.result), 128):
|
|
self.seqs[tr.propose(3, struct.pack(">I", addr))] = addr
|
|
|
|
def addresult(self, tr, seq, payload):
|
|
addr = self.seqs[seq]
|
|
assert len(payload) == 128
|
|
for i in range(128):
|
|
self.result[addr + i] = ord(payload[i])
|
|
|
|
def close(self):
|
|
open('flash.dump', 'w').write("".join([chr(x) for x in self.result]))
|
|
for a in range(0, 256, 16):
|
|
print ("%04x " % a) + " ".join("%02x" % x for x in self.result[a:a+16])
|
|
|
|
class EraseFlash(Action):
|
|
def startwork(self, tr):
|
|
tr.propose(4, "")
|
|
def close(self):
|
|
time.sleep(5)
|
|
|
|
class WaitFlash(Action):
|
|
def startwork(self, tr):
|
|
self.seq = tr.propose(5, struct.pack(">I", 0))
|
|
def addresult(self, tr, seq, payload):
|
|
(res,) = struct.unpack(">H", payload)
|
|
if res == 0:
|
|
self.startwork(tr)
|
|
|
|
def bitload(bitfilename):
|
|
bit = open(bitfilename, "r")
|
|
|
|
def getH(fi):
|
|
return struct.unpack(">H", bit.read(2))[0]
|
|
def getI(fi):
|
|
return struct.unpack(">I", bit.read(4))[0]
|
|
|
|
bit.seek(getH(bit), os.SEEK_CUR)
|
|
assert getH(bit) == 1
|
|
|
|
# Search for the data section in the .bit file...
|
|
while True:
|
|
ty = ord(bit.read(1))
|
|
if ty == 0x65:
|
|
break
|
|
length = getH(bit)
|
|
bit.seek(length, os.SEEK_CUR)
|
|
fieldLength = getI(bit)
|
|
return bit.read(fieldLength)
|
|
|
|
# open("xxx", "w").write(bitload("j1_program.bit"))
|
|
|
|
import intelhex
|
|
import array
|
|
|
|
class Hexfile(object):
|
|
def __init__(self, filename):
|
|
self.hf = intelhex.IntelHex(filename)
|
|
self.hf.readfile()
|
|
while (self.hf.maxaddr() % 128) != 127:
|
|
self.hf[self.hf.maxaddr() + 1] = 0xff
|
|
print "%x %x" % (self.hf.minaddr(), self.hf.maxaddr())
|
|
|
|
def minmax(self):
|
|
return (self.hf.minaddr(), self.hf.maxaddr())
|
|
|
|
# The XESS CPLD bootloader runs the flash in byte mode,
|
|
# and the flash is littleendian, so must do the endian
|
|
# swap here
|
|
def blk(self, o):
|
|
b128 = array.array('B', [self.hf[o + i] for i in range(128)]).tostring()
|
|
hh = array.array('H', b128)
|
|
hh.byteswap()
|
|
return hh.tostring()
|
|
|
|
class WriteFlash(Action, Hexfile):
|
|
|
|
def startwork(self, tr):
|
|
for o in range(self.hf.minaddr(), self.hf.maxaddr(), 128):
|
|
tr.propose(6, struct.pack(">I", o) + self.blk(o))
|
|
|
|
class VerifyFlash(Action, Hexfile):
|
|
|
|
def startwork(self, tr):
|
|
self.seqs = {}
|
|
for o in range(self.hf.minaddr(), self.hf.maxaddr(), 128):
|
|
self.seqs[tr.propose(3, struct.pack(">I", o))] = o
|
|
|
|
def addresult(self, tr, seq, payload):
|
|
addr = self.seqs[seq]
|
|
assert len(payload) == 128, 'short packet'
|
|
assert self.blk(addr) == payload, "mismatch at %#x" % addr
|
|
|
|
def close(self):
|
|
print "Flash verified OK"
|
|
|
|
class EraseSector(Action):
|
|
def __init__(self, a):
|
|
self.a = a
|
|
def startwork(self, tr):
|
|
tr.propose(7, struct.pack(">I", self.a))
|
|
def close(self):
|
|
time.sleep(.1)
|
|
|
|
class WaitSector(Action):
|
|
def __init__(self, a):
|
|
self.a = a
|
|
def startwork(self, tr):
|
|
self.seq = tr.propose(5, struct.pack(">I", self.a))
|
|
def addresult(self, tr, seq, payload):
|
|
(res,) = struct.unpack(">H", payload)
|
|
if res == 0:
|
|
self.startwork(tr)
|
|
|
|
class LoadSector(Action):
|
|
def __init__(self, a, data):
|
|
self.a = a
|
|
self.data = data
|
|
def startwork(self, tr):
|
|
for o in range(0, len(self.data), 128):
|
|
blk = self.data[o:o+128]
|
|
if blk != (128 * chr(0xff)):
|
|
tr.propose(6, struct.pack(">I", self.a + o) + blk)
|
|
|
|
class DumpSector(Action):
|
|
|
|
def __init__(self, a):
|
|
self.a = a
|
|
def startwork(self, tr):
|
|
self.seqs = {}
|
|
for o in [0]:
|
|
self.seqs[tr.propose(3, struct.pack(">I", self.a + o))] = o
|
|
|
|
def addresult(self, tr, seq, payload):
|
|
addr = self.a + self.seqs[seq]
|
|
assert len(payload) == 128
|
|
print "result", repr(payload)
|
|
|
|
# t = Transporter([WriteRAM(), VerifyRAM(), Reboot()])
|
|
# t = Transporter([EraseFlash(), WaitFlash()])
|
|
# sys.exit(0)
|
|
|
|
erasing = [EraseFlash(), WaitFlash()]
|
|
bases = [ 0 ]
|
|
bases = [0, 0x80000, 0x100000, 0x180000]
|
|
bases = [0x80000]
|
|
# Transporter(erasing + [WriteFlash("j1_program_%x.mcs" % base) for base in bases])
|
|
# Transporter([VerifyFlash("j1_program_%x.mcs" % base) for base in bases])
|
|
# Transporter([EraseSector(seca), WaitSector(seca), ld, DumpSector(seca)])
|
|
|
|
def loadcode(dsta, filenames):
|
|
data = "".join([open(fn).read() for fn in filenames])
|
|
return [EraseSector(dsta),
|
|
WaitSector(dsta),
|
|
LoadSector(dsta, data)]
|
|
|
|
def pngstr(filename):
|
|
import Image
|
|
sa = array.array('B', Image.open(filename).convert("L").tostring())
|
|
return struct.pack('>1024H', *sa.tolist())
|
|
|
|
def erasesecs(lo, hi):
|
|
r = []
|
|
for s in range(lo, hi, 65536):
|
|
r += [EraseSector(s), WaitSector(s)]
|
|
return r
|
|
|
|
def loadhex(filename):
|
|
w = WriteFlash(filename)
|
|
(lo, hi) = w.minmax()
|
|
return erasesecs(lo, hi) + [w]
|
|
|
|
def loadsprites(dsta, filenames):
|
|
data = "".join([pngstr(f) for f in filenames])
|
|
print "Loading %d bytes" % len(data)
|
|
return erasesecs(dsta, dsta + len(data)) + [LoadSector(dsta, data)]
|
|
|
|
# Transporter(loadcode(0x180000, ["j1.png.pic", "font8x8", "j1.png.chr"]) + [Reboot()])
|
|
spr = ["%d.png" % (i/2) for i in range(16)]
|
|
spr += ["blob.png"] * 16
|
|
spr += ["fsm-32.png", "pop.png"] * 6 + ["bomb.png", "pop.png", "shot.png", "pop.png"]
|
|
|
|
# Transporter(loadsprites(0x200000, spr))
|
|
# Transporter(loadcode(0x190000, ["j1.bin"]) + [Reboot()])
|
|
# t = Transporter([ReadFlash()])
|
|
|
|
Transporter(
|
|
# loadhex("j1_program_80000.mcs")
|
|
loadcode(0x190000, ["j1.bin"]) + [Reboot()]
|
|
)
|