#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2017 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
# Copyright (c) 2012, 2013, 2014 Martin Raspaud
# Author(s):
# Martin Raspaud <martin.raspaud@smhi.se>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Reader for eps level 1b data. Uses xml files as a format description.
"""
import logging
import os
import numpy as np
import xarray as xr
import dask.array as da
from dask.delayed import delayed
from pyresample.geometry import SwathDefinition
from satpy.config import CONFIG_PATH
from satpy.readers.file_handlers import BaseFileHandler
from satpy.readers.xmlformat import XMLFormat
from satpy import CHUNK_SIZE
LOG = logging.getLogger(__name__)
C1 = 1.191062e-05 # mW/(m2*sr*cm-4)
C2 = 1.4387863 # K/cm-1
[docs]def radiance_to_bt(arr, wc_, a__, b__):
"""Convert to BT.
"""
return a__ + b__ * (C2 * wc_ / (da.log(1 + (C1 * (wc_ ** 3) / arr))))
[docs]def radiance_to_refl(arr, solar_flux):
"""Convert to reflectances.
"""
return arr * np.pi * 100.0 / solar_flux
record_class = ["Reserved", "mphr", "sphr",
"ipr", "geadr", "giadr",
"veadr", "viadr", "mdr"]
[docs]def read_raw(filename):
"""Read *filename* without scaling it afterwards.
"""
form = XMLFormat(os.path.join(CONFIG_PATH, "eps_avhrrl1b_6.5.xml"))
grh_dtype = np.dtype([("record_class", "|i1"),
("INSTRUMENT_GROUP", "|i1"),
("RECORD_SUBCLASS", "|i1"),
("RECORD_SUBCLASS_VERSION", "|i1"),
("RECORD_SIZE", ">u4"),
("RECORD_START_TIME", "S6"),
("RECORD_STOP_TIME", "S6")])
dtypes = []
cnt = 0
with open(filename, "rb") as fdes:
while True:
grh = np.fromfile(fdes, grh_dtype, 1)
if not grh:
break
rec_class = record_class[int(grh["record_class"])]
sub_class = grh["RECORD_SUBCLASS"][0]
expected_size = int(grh["RECORD_SIZE"])
bare_size = expected_size - grh_dtype.itemsize
try:
the_type = form.dtype((rec_class, sub_class))
the_descr = grh_dtype.descr + the_type.descr
except KeyError:
the_type = np.dtype([('unknown', 'V%d' % bare_size)])
the_descr = grh_dtype.descr + the_type.descr
the_type = np.dtype(the_descr)
if the_type.itemsize < expected_size:
padding = [('unknown%d' % cnt, 'V%d' % (expected_size - the_type.itemsize))]
cnt += 1
the_descr += padding
dtypes.append(np.dtype(the_descr))
fdes.seek(expected_size - grh_dtype.itemsize, 1)
file_dtype = np.dtype([(str(num), the_dtype) for num, the_dtype in enumerate(dtypes)])
records = np.memmap(fdes, mode='r', dtype=file_dtype, shape=1)[0]
return records, form
[docs]def create_xarray(arr):
res = arr
res = xr.DataArray(res, dims=['y', 'x'])
return res
[docs]class EPSAVHRRFile(BaseFileHandler):
"""Eps level 1b reader for AVHRR data.
"""
spacecrafts = {"M01": "Metop-B",
"M02": "Metop-A",
"M03": "Metop-C", }
sensors = {"AVHR": "avhrr-3"}
def __init__(self, filename, filename_info, filetype_info):
super(EPSAVHRRFile, self).__init__(
filename, filename_info, filetype_info)
self.lons, self.lats = None, None
self.sun_azi, self.sun_zen, self.sat_azi, self.sat_zen = None, None, None, None
self.area = None
self.three_a_mask, self.three_b_mask = None, None
self._start_time = filename_info['start_time']
self._end_time = filename_info['end_time']
self.records = None
self.form = None
self.mdrs = None
self.scanlines = None
self.pixels = None
self.sections = None
def _read_all(self, filename):
LOG.debug("Reading %s", filename)
self.records, self.form = read_raw(filename)
self.mdrs = [record
for record in self.records
if record_class[record['record_class']] == "mdr"]
self.iprs = [record
for record in self.records
if record_class[record['record_class']] == "ipr"]
self.scanlines = len(self.mdrs)
self.sections = {("mdr", 2): np.hstack(self.mdrs)}
self.sections[("ipr", 0)] = np.hstack(self.iprs)
for record in self.records:
rec_class = record_class[record['record_class']]
sub_class = record["RECORD_SUBCLASS"]
if rec_class in ["mdr", "ipr"]:
continue
if (rec_class, sub_class) in self.sections:
raise ValueError("Too many " + str((rec_class, sub_class)))
else:
self.sections[(rec_class, sub_class)] = record
self.pixels = self["EARTH_VIEWS_PER_SCANLINE"]
def __getitem__(self, key):
for altkey in self.form.scales.keys():
try:
try:
return (da.from_array(self.sections[altkey][key], chunks=CHUNK_SIZE)
* self.form.scales[altkey][key])
except TypeError:
val = self.sections[altkey][key].decode().split("=")[1]
try:
return float(val) * self.form.scales[altkey][key]
except ValueError:
return val.strip()
except (KeyError, ValueError):
continue
raise KeyError("No matching value for " + str(key))
[docs] def keys(self):
"""List of reader's keys.
"""
keys = []
for val in self.form.scales.values():
keys += val.dtype.fields.keys()
return keys
@delayed(nout=2, pure=True)
def _get_full_lonlats(self):
lats = np.hstack((self["EARTH_LOCATION_FIRST"][:, [0]],
self["EARTH_LOCATIONS"][:, :, 0],
self["EARTH_LOCATION_LAST"][:, [0]]))
lons = np.hstack((self["EARTH_LOCATION_FIRST"][:, [1]],
self["EARTH_LOCATIONS"][:, :, 1],
self["EARTH_LOCATION_LAST"][:, [1]]))
nav_sample_rate = self["NAV_SAMPLE_RATE"]
earth_views_per_scanline = self["EARTH_VIEWS_PER_SCANLINE"]
if nav_sample_rate == 20 and earth_views_per_scanline == 2048:
from geotiepoints import metop20kmto1km
return metop20kmto1km(lons, lats)
else:
raise NotImplementedError("Lon/lat expansion not implemented for " +
"sample rate = " + str(nav_sample_rate) +
" and earth views = " +
str(earth_views_per_scanline))
[docs] def get_full_lonlats(self):
"""Get the interpolated lons/lats.
"""
if self.lons is not None and self.lats is not None:
return self.lons, self.lats
self.lons, self.lats = self._get_full_lonlats()
self.lons = da.from_delayed(self.lons, dtype=self["EARTH_LOCATIONS"].dtype,
shape=(self.scanlines, self.pixels))
self.lats = da.from_delayed(self.lats, dtype=self["EARTH_LOCATIONS"].dtype,
shape=(self.scanlines, self.pixels))
return self.lons, self.lats
@delayed(nout=4, pure=True)
def _get_full_angles(self):
solar_zenith = np.hstack((self["ANGULAR_RELATIONS_FIRST"][:, [0]],
self["ANGULAR_RELATIONS"][:, :, 0],
self["ANGULAR_RELATIONS_LAST"][:, [0]]))
sat_zenith = np.hstack((self["ANGULAR_RELATIONS_FIRST"][:, [1]],
self["ANGULAR_RELATIONS"][:, :, 1],
self["ANGULAR_RELATIONS_LAST"][:, [1]]))
solar_azimuth = np.hstack((self["ANGULAR_RELATIONS_FIRST"][:, [2]],
self["ANGULAR_RELATIONS"][:, :, 2],
self["ANGULAR_RELATIONS_LAST"][:, [2]]))
sat_azimuth = np.hstack((self["ANGULAR_RELATIONS_FIRST"][:, [3]],
self["ANGULAR_RELATIONS"][:, :, 3],
self["ANGULAR_RELATIONS_LAST"][:, [3]]))
nav_sample_rate = self["NAV_SAMPLE_RATE"]
earth_views_per_scanline = self["EARTH_VIEWS_PER_SCANLINE"]
if nav_sample_rate == 20 and earth_views_per_scanline == 2048:
from geotiepoints import metop20kmto1km
# Note: interpolation asumes lat values values between -90 and 90
# Solar and satellite zenith is between 0 and 180.
solar_zenith -= 90
sun_azi, sun_zen = metop20kmto1km(
solar_azimuth, solar_zenith)
sun_zen += 90
sat_zenith -= 90
sat_azi, sat_zen = metop20kmto1km(
sat_azimuth, sat_zenith)
sat_zen += 90
return sun_azi, sun_zen, sat_azi, sat_zen
else:
raise NotImplementedError("Angles expansion not implemented for " +
"sample rate = " + str(nav_sample_rate) +
" and earth views = " +
str(earth_views_per_scanline))
[docs] def get_full_angles(self):
"""Get the interpolated lons/lats.
"""
if (self.sun_azi is not None and self.sun_zen is not None and
self.sat_azi is not None and self.sat_zen is not None):
return self.sun_azi, self.sun_zen, self.sat_azi, self.sat_zen
self.sun_azi, self.sun_zen, self.sat_azi, self.sat_zen = self._get_full_angles()
self.sun_azi = da.from_delayed(self.sun_azi, dtype=self["ANGULAR_RELATIONS"].dtype,
shape=(self.scanlines, self.pixels))
self.sun_zen = da.from_delayed(self.sun_zen, dtype=self["ANGULAR_RELATIONS"].dtype,
shape=(self.scanlines, self.pixels))
self.sat_azi = da.from_delayed(self.sat_azi, dtype=self["ANGULAR_RELATIONS"].dtype,
shape=(self.scanlines, self.pixels))
self.sat_zen = da.from_delayed(self.sat_zen, dtype=self["ANGULAR_RELATIONS"].dtype,
shape=(self.scanlines, self.pixels))
return self.sun_azi, self.sun_zen, self.sat_azi, self.sat_zen
[docs] def get_bounding_box(self):
if self.mdrs is None:
self._read_all(self.filename)
lats = np.hstack([self["EARTH_LOCATION_FIRST"][0, [0]],
self["EARTH_LOCATION_LAST"][0, [0]],
self["EARTH_LOCATION_LAST"][-1, [0]],
self["EARTH_LOCATION_FIRST"][-1, [0]]])
lons = np.hstack([self["EARTH_LOCATION_FIRST"][0, [1]],
self["EARTH_LOCATION_LAST"][0, [1]],
self["EARTH_LOCATION_LAST"][-1, [1]],
self["EARTH_LOCATION_FIRST"][-1, [1]]])
return lons.ravel(), lats.ravel()
[docs] def get_dataset(self, key, info):
"""Get calibrated channel data."""
if self.mdrs is None:
self._read_all(self.filename)
if key.name in ['longitude', 'latitude']:
lons, lats = self.get_full_lonlats()
if key.name == 'longitude':
dataset = create_xarray(lons)
else:
dataset = create_xarray(lats)
elif key.name in ['solar_zenith_angle', 'solar_azimuth_angle',
'satellite_zenith_angle', 'satellite_azimuth_angle']:
sun_azi, sun_zen, sat_azi, sat_zen = self.get_full_angles()
if key.name == 'solar_zenith_angle':
dataset = create_xarray(sun_zen)
elif key.name == 'solar_azimuth_angle':
dataset = create_xarray(sun_azi)
if key.name == 'satellite_zenith_angle':
dataset = create_xarray(sat_zen)
elif key.name == 'satellite_azimuth_angle':
dataset = create_xarray(sat_azi)
else:
mask = None
if key.calibration == 'counts':
raise ValueError('calibration=counts is not supported! ' +
'This reader cannot return counts')
elif key.calibration not in ['reflectance', 'brightness_temperature', 'radiance']:
raise ValueError('calibration type ' + str(key.calibration) +
' is not supported!')
if key.name in ['3A', '3a'] and self.three_a_mask is None:
self.three_a_mask = ((self["FRAME_INDICATOR"] & 2 ** 16) != 2 ** 16)
if key.name in ['3B', '3b'] and self.three_b_mask is None:
self.three_b_mask = ((self["FRAME_INDICATOR"] & 2 ** 16) != 0)
if key.name not in ["1", "2", "3a", "3A", "3b", "3B", "4", "5"]:
LOG.info("Can't load channel in eps_l1b: " + str(key.name))
return
if key.name == "1":
if key.calibration == 'reflectance':
array = radiance_to_refl(self["SCENE_RADIANCES"][:, 0, :],
self["CH1_SOLAR_FILTERED_IRRADIANCE"])
else:
array = self["SCENE_RADIANCES"][:, 0, :]
if key.name == "2":
if key.calibration == 'reflectance':
array = radiance_to_refl(self["SCENE_RADIANCES"][:, 1, :],
self["CH2_SOLAR_FILTERED_IRRADIANCE"])
else:
array = self["SCENE_RADIANCES"][:, 1, :]
if key.name.lower() == "3a":
if key.calibration == 'reflectance':
array = radiance_to_refl(self["SCENE_RADIANCES"][:, 2, :],
self["CH3A_SOLAR_FILTERED_IRRADIANCE"])
else:
array = self["SCENE_RADIANCES"][:, 2, :]
mask = np.empty(array.shape, dtype=bool)
mask[:, :] = self.three_a_mask[:, np.newaxis]
if key.name.lower() == "3b":
if key.calibration == 'brightness_temperature':
array = radiance_to_bt(self["SCENE_RADIANCES"][:, 2, :],
self["CH3B_CENTRAL_WAVENUMBER"],
self["CH3B_CONSTANT1"],
self["CH3B_CONSTANT2_SLOPE"])
else:
array = self["SCENE_RADIANCES"][:, 2, :]
mask = np.empty(array.shape, dtype=bool)
mask[:, :] = self.three_b_mask[:, np.newaxis]
if key.name == "4":
if key.calibration == 'brightness_temperature':
array = radiance_to_bt(self["SCENE_RADIANCES"][:, 3, :],
self["CH4_CENTRAL_WAVENUMBER"],
self["CH4_CONSTANT1"],
self["CH4_CONSTANT2_SLOPE"])
else:
array = self["SCENE_RADIANCES"][:, 3, :]
if key.name == "5":
if key.calibration == 'brightness_temperature':
array = radiance_to_bt(self["SCENE_RADIANCES"][:, 4, :],
self["CH5_CENTRAL_WAVENUMBER"],
self["CH5_CONSTANT1"],
self["CH5_CONSTANT2_SLOPE"])
else:
array = self["SCENE_RADIANCES"][:, 4, :]
dataset = create_xarray(array)
if mask is not None:
dataset = dataset.where(~mask)
dataset.attrs['platform_name'] = self.platform_name
dataset.attrs['sensor'] = self.sensor_name
dataset.attrs.update(info)
dataset.attrs.update(key.to_dict())
return dataset
[docs] def get_lonlats(self):
if self.area is None:
if self.lons is None or self.lats is None:
self.lons, self.lats = self.get_full_lonlats()
self.area = SwathDefinition(self.lons, self.lats)
self.area.name = '_'.join([self.platform_name, str(self.start_time),
str(self.end_time)])
return self.area
@property
def platform_name(self):
return self.spacecrafts[self["SPACECRAFT_ID"]]
@property
def sensor_name(self):
return self.sensors[self["INSTRUMENT_ID"]]
@property
def start_time(self):
# return datetime.strptime(self["SENSING_START"], "%Y%m%d%H%M%SZ")
return self._start_time
@property
def end_time(self):
# return datetime.strptime(self["SENSING_END"], "%Y%m%d%H%M%SZ")
return self._end_time
if __name__ == '__main__':
def norm255(a__):
"""normalize array to uint8.
"""
arr = a__ * 1.0
arr = (arr - arr.min()) * 255.0 / (arr.max() - arr.min())
return arr.astype(np.uint8)
def show(a__):
"""show array.
"""
from PIL import Image
Image.fromarray(norm255(a__), "L").show()
import sys
res = read_raw(sys.argv[1])