注解
单击 here 要下载完整的示例代码,请执行以下操作
解密栅格¶
这个 RasterElement 对象以WKB形式存储栅格数据。当使用栅格时,通常最好将其转换为TIFF、PNG、JPEG或其他格式。不过,可以对WKB进行解密以获得二维值列表。此示例使用SQLAlChemy ORM查询。
10 import binascii
11 import struct
12
13 import pytest
14 from sqlalchemy import Column
15 from sqlalchemy import create_engine
16 from sqlalchemy import Integer
17 from sqlalchemy import MetaData
18 from sqlalchemy.ext.declarative import declarative_base
19 from sqlalchemy.orm import sessionmaker
20
21 from geoalchemy2 import Raster, WKTElement
22
23
24 engine = create_engine('postgresql://gis:gis@localhost/gis', echo=False)
25 metadata = MetaData(engine)
26 Base = declarative_base(metadata=metadata)
27
28 session = sessionmaker(bind=engine)()
29
30
31 class Ocean(Base):
32 __tablename__ = 'ocean'
33 id = Column(Integer, primary_key=True)
34 rast = Column(Raster)
35
36 def __init__(self, rast):
37 self.rast = rast
38
39
40 def _format_e(endianess, struct_format):
41 return _ENDIANESS[endianess] + struct_format
42
43
44 def wkbHeader(raw):
45 # Function to decipher the WKB header
46 # See http://trac.osgeo.org/postgis/browser/trunk/raster/doc/RFC2-WellKnownBinaryFormat
47
48 header = {}
49
50 header['endianess'] = struct.unpack('b', raw[0:1])[0]
51
52 e = header['endianess']
53 header['version'] = struct.unpack(_format_e(e, 'H'), raw[1:3])[0]
54 header['nbands'] = struct.unpack(_format_e(e, 'H'), raw[3:5])[0]
55 header['scaleX'] = struct.unpack(_format_e(e, 'd'), raw[5:13])[0]
56 header['scaleY'] = struct.unpack(_format_e(e, 'd'), raw[13:21])[0]
57 header['ipX'] = struct.unpack(_format_e(e, 'd'), raw[21:29])[0]
58 header['ipY'] = struct.unpack(_format_e(e, 'd'), raw[29:37])[0]
59 header['skewX'] = struct.unpack(_format_e(e, 'd'), raw[37:45])[0]
60 header['skewY'] = struct.unpack(_format_e(e, 'd'), raw[45:53])[0]
61 header['srid'] = struct.unpack(_format_e(e, 'i'), raw[53:57])[0]
62 header['width'] = struct.unpack(_format_e(e, 'H'), raw[57:59])[0]
63 header['height'] = struct.unpack(_format_e(e, 'H'), raw[59:61])[0]
64
65 return header
66
67
68 def read_band(data, offset, pixtype, height, width, endianess=1):
69 ptype, _, psize = _PTYPE[pixtype]
70 pix_data = data[offset + 1: offset + 1 + width * height * psize]
71 band = [
72 [
73 struct.unpack(_format_e(endianess, ptype), pix_data[
74 (i * width + j) * psize: (i * width + j + 1) * psize
75 ])[0]
76 for j in range(width)
77 ]
78 for i in range(height)
79 ]
80 return band
81
82
83 def read_band_numpy(data, offset, pixtype, height, width, endianess=1):
84 import numpy as np # noqa
85 _, dtype, psize = _PTYPE[pixtype]
86 dt = np.dtype(dtype)
87 dt = dt.newbyteorder(_ENDIANESS[endianess])
88 band = np.frombuffer(data, dtype=dtype,
89 count=height * width, offset=offset + 1)
90 band = (np.reshape(band, ((height, width))))
91 return band
92
93
94 _PTYPE = {
95 0: ['?', '?', 1],
96 1: ['B', 'B', 1],
97 2: ['B', 'B', 1],
98 3: ['b', 'b', 1],
99 4: ['B', 'B', 1],
100 5: ['h', 'i2', 2],
101 6: ['H', 'u2', 2],
102 7: ['i', 'i4', 4],
103 8: ['I', 'u4', 4],
104 10: ['f', 'f4', 4],
105 11: ['d', 'f8', 8],
106 }
107
108 _ENDIANESS = {
109 0: '>',
110 1: '<',
111 }
112
113
114 def wkbImage(raster_data, use_numpy=False):
115 """Function to decipher the WKB raster data"""
116
117 # Get binary data
118 raw = binascii.unhexlify(raster_data)
119
120 # Read header
121 h = wkbHeader(bytes(raw))
122 e = h["endianess"]
123
124 img = [] # array to store image bands
125 offset = 61 # header raw length in bytes
126 band_size = h['width'] * h['height'] # number of pixels in each band
127
128 for i in range(h['nbands']):
129 # Determine pixtype for this band
130 pixtype = struct.unpack(_format_e(e, 'b'), raw[offset: offset + 1])[0] - 64
131
132 # Read data with either pure Python or Numpy
133 if use_numpy:
134 band = read_band_numpy(
135 raw, offset, pixtype, h['height'], h['width'])
136 else:
137 band = read_band(
138 raw, offset, pixtype, h['height'], h['width'])
139
140 # Store the result
141 img.append(band)
142 offset = offset + 2 + band_size
143
144 return img
145
146
147 class TestDecipherRaster():
148
149 def setup(self):
150 metadata.drop_all(checkfirst=True)
151 metadata.create_all()
152
153 def teardown(self):
154 session.rollback()
155 metadata.drop_all()
156
157 @pytest.mark.parametrize("pixel_type", [
158 '1BB',
159 '2BUI',
160 '4BUI',
161 '8BSI',
162 '8BUI',
163 '16BSI',
164 '16BUI',
165 '32BSI',
166 '32BUI',
167 '32BF',
168 '64BF'
169 ])
170 def test_decipher_raster(self, pixel_type):
171 """Create a raster and decipher it"""
172
173 # Create a new raster
174 polygon = WKTElement('POLYGON((0 0,1 1,0 1,0 0))', srid=4326)
175 o = Ocean(polygon.ST_AsRaster(5, 6, pixel_type))
176 session.add(o)
177 session.flush()
178
179 # Decipher data from each raster
180 image = wkbImage(o.rast.data)
181
182 # Define expected result
183 expected = [
184 [0, 1, 1, 1, 1],
185 [1, 1, 1, 1, 1],
186 [0, 1, 1, 1, 0],
187 [0, 1, 1, 0, 0],
188 [0, 1, 0, 0, 0],
189 [0, 0, 0, 0, 0]
190 ]
191
192 # Check results
193 band = image[0]
194 assert band == expected
脚本的总运行时间: (0分0.000秒)