13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 | def write(profiles, base_dir, verbose):
"""
Writing method for an instance of a (aprofiles.profiles.ProfilesData): class.
Args:
profiles (aprofiles.profiles.ProfilesData): Object to be written.
base_dir (str): Base path of the file should be written.
verbose (bool): Verbose mode.
"""
def _classify_scene(ds):
lowest_clouds = profiles._get_lowest_clouds()
scene = []
# got clouds classification here: https://www.metoffice.gov.uk/weather/learn-about/weather/types-of-weather/clouds
for i, lowest_cloud in enumerate(lowest_clouds):
if lowest_cloud<=1981:
scene.append(3)
elif lowest_cloud>1981 and lowest_cloud<=6096:
scene.append(2)
elif lowest_cloud>6096:
scene.append(1)
else:
scene.append(0)
# overwrite result based on foc
if ds.foc.data[i]:
scene[i] = 4
return scene
def _classify_retrieval_scene(ds):
lowest_clouds = profiles._get_lowest_clouds()
z_ref = profiles.data.z_ref.data
scene = []
for i, _ in enumerate(lowest_clouds):
if lowest_clouds[i]>z_ref[i]:
scene.append(1)
elif lowest_clouds[i]<z_ref[i]:
scene.append(3)
else:
scene.append(0)
# overwrite result based on foc
if ds.foc.data[i]:
scene[i] = 4
return scene
# get dataset from profilesdata
ds = profiles.data
#get date as string yyyy-mm-dd from first value of the time data
str_date = str(ds.time.values[0])[:10]
yyyy = str_date[:4]
mm = str_date[5:7]
dd = str_date[8:10]
filename = f"AP_{ds.wigos_station_id}-{ds.instrument_id}-{str_date}.nc"
path = os.path.join(base_dir, yyyy, mm, dd, filename)
if utils.file_exists(path) and verbose:
warnings.warn(f"{path} already exists and will be overwritten.")
else:
from pathlib import Path
Path(base_dir, yyyy, mm, dd).mkdir(parents=True, exist_ok=True)
# creates a copy od original dataset -> writes only necessary data
ds_towrite = copy.deepcopy(ds)
# for the mass concentration, we just need the mec.
mec = {}
for data_var in list(ds_towrite.data_vars):
if 'mass_concentration:' in data_var:
mec[data_var.split(':')[1]] = ds[data_var].mec
ds_towrite = ds_towrite.drop(data_var)
# add mec as new dataarray
ds_towrite["mec"] = xr.DataArray(
data=list(mec.values()),
dims=["aer_type"],
coords=dict(
aer_type=list(mec.keys()),
),
attrs=dict(long_name="Mass to Extinction Coefficient", units='m2.g-1', wavelength=ds.l0_wavelength.data),
)
# add attributes to aer_type
ds_towrite["aer_type"] = ds_towrite["aer_type"].assign_attrs({
'long_name': 'Aerosol type'
})
# determines the scene classification for each profile
scene = _classify_scene(ds_towrite)
# add scene as new dataarray
ds_towrite["scene"] = ("time", scene)
ds_towrite["scene"] = ds_towrite["scene"].assign_attrs({
'long_name': "Scene classification",
'definition': '0: aer (aerosols only) / 1: high_cloud (base cloud above 6096 m) / 2: mid_cloud (base cloud between 1981 m and 6096 m) / 3: low-cloud (base cloud below 1981 m) - 4: foc (fog or condensation)'
})
# add scene for extinction profile: cloud_above, cloud_below
retrieval_scene = _classify_retrieval_scene(ds_towrite)
# add scene as new dataarray
ds_towrite["retrieval_scene"] = ("time", retrieval_scene)
ds_towrite["retrieval_scene"] = ds_towrite["retrieval_scene"].assign_attrs({
'long_name': "Retrieval scene classification",
'definition': '0: aer (aerosols only) / 1: cloud_above (cloud above reference altitude) / 3: cloud_below (cloud below reference point) / 4: foc (fog or condensation)'
})
# drop other variables
drop_variables = ['start_time', 'cloud_base_height', 'vertical_visibility', 'cbh_uncertainties', 'uncertainties_att_backscatter_0']
for drop_var in drop_variables:
ds_towrite = ds_towrite.drop(drop_var)
# some variables have no dimension. Set it as attribute and drop the variable.
nodim_variables = ['l0_wavelength', 'station_latitude', 'station_longitude', 'station_altitude']
for nodim_var in nodim_variables:
ds_towrite.attrs[nodim_var] = ds_towrite[nodim_var].data
ds_towrite = ds_towrite.drop(nodim_var)
# add altitude direction
ds_towrite["altitude"] = ds_towrite["altitude"].assign_attrs({
'positive': "up"
})
# encoding with compression
encoding = {}
for varname, var in ds_towrite.variables.items():
if varname == "time": continue
if var.dtype == np.int64:
encoding[varname] = {"dtype": np.int32, "zlib": True, "chunksizes": var.shape}
if varname in ["extinction", "clouds_bases", "clouds_peaks", "clouds_tops"]:
encoding[varname] = {"zlib": True, "chunksizes": var.shape}
# convert also the quality_flag's variable flag_values attribute also to NC_INT instead of NC_INT64
ds_towrite["quality_flag"] = ds_towrite.quality_flag.assign_attrs({'flag_values': np.array([0,1,2], dtype=np.int32)})
# writes to netcdf
ds_towrite.to_netcdf(path, mode='w', encoding=encoding)
|