-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatainputstream.py
executable file
·81 lines (61 loc) · 2.3 KB
/
datainputstream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Dec 7 11:36:27 2017
@author: kpalmer
Reading from Java DataInputStream format.
From https://github.com/arngarden/python_java_datastream
This uses big endian (network) format.
"""
import struct
class DataInputStream:
def __init__(self, stream):
self.stream = stream
def read_record(self, format, n=1, order=">"):
"""read_record(format, n)
format - format of record (e.g. 'dd', see structu.unpack)
n - number of records
order - order ">" network/big-endian (default) or "<" little endian
"""
# Build up format string and figure out how large it is
unpackstr = order + format * n
bytes0 = struct.calcsize(unpackstr)
data = self.stream.read(bytes0)
if len(data) < bytes0:
if len(data) == 0:
raise EOFError # no data, end of file
else:
# We didn't get all the data we expected, but we got some
raise ValueError("Expected {} bytes, read {} before EOF".format(
bytes0, len(data)))
values = struct.unpack(unpackstr, data)
return values
def read_n_doubles_and_bools(self, ndoubles=0, nbools=0, n=1):
unpack_fmt_string = 'd'*ndoubles + '?'*nbools
return self.read_record(unpack_fmt_string, n)
def read_boolean(self):
return self.read_record("?")[0]
def read_byte(self):
return self.read_record("b")[0]
def read_unsigned_byte(self):
return self.read_record("B")[0]
def read_char(self):
return self.read_record("H")[0]
def read_double(self):
return self.read_record("D")[0]
def read_float(self):
return self.read_record("f")[0]
def read_short(self):
return self.read_record("h")[0]
def read_unsigned_short(self):
return self.read_record("H")[0]
def read_long(self):
return self.read_record("q")[0]
def read_utf(self):
"read_utf - Read "
utf_length = struct.unpack('>H', self.stream.read(2))[0]
return self.stream.read(utf_length)
def read_int(self):
return self.read_record("i")[0]
def read_unsigned_int(self):
return self.read_record("I")[0]