Skip to content

Writing

This module is used to write data from an instance of the ProfilesData class as a NetCDF file.

write(profiles, base_dir, verbose)

Writing method for an instance of a (aprofiles.profiles.ProfilesData): class.

Parameters:

Name Type Description Default
profiles ProfilesData

Object to be written.

required
base_dir str

Base path of the file should be written.

required
verbose bool

Verbose mode.

required
Source code in aprofiles/io/write_profiles.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def write(profiles, base_dir, verbose):
    """
    Writing method for an instance of a (aprofiles.profiles.ProfilesData): class.

    Args:
        profiles (aprofiles.profiles.ProfilesData): Object to be written.
        base_dir (str): Base path of the file should be written.
        verbose (bool): Verbose mode.
    """

    def _classify_scene(ds):
        lowest_clouds = profiles._get_lowest_clouds()
        scene = []
        # got clouds classification here: https://www.metoffice.gov.uk/weather/learn-about/weather/types-of-weather/clouds
        for i, lowest_cloud in enumerate(lowest_clouds):
            if lowest_cloud<=1981:
                scene.append(3)
            elif lowest_cloud>1981 and lowest_cloud<=6096:
                scene.append(2)
            elif lowest_cloud>6096:
                scene.append(1)
            else:
                scene.append(0)
            # overwrite result based on foc
            if ds.foc.data[i]:
                scene[i] = 4
        return scene

    def _classify_retrieval_scene(ds):
        lowest_clouds = profiles._get_lowest_clouds()
        z_ref = profiles.data.z_ref.data
        scene = []
        for i, _ in enumerate(lowest_clouds):
            if lowest_clouds[i]>z_ref[i]:
                scene.append(1)
            elif lowest_clouds[i]<z_ref[i]:
                scene.append(3)
            else:
                scene.append(0)
            # overwrite result based on foc
            if ds.foc.data[i]:
                scene[i] = 4
        return scene

    # get dataset from profilesdata
    ds = profiles.data

    #get date as string yyyy-mm-dd from first value of the time data
    str_date = str(ds.time.values[0])[:10]
    yyyy = str_date[:4]
    mm = str_date[5:7]
    dd = str_date[8:10]
    filename = f"AP_{ds.wigos_station_id}-{ds.instrument_id}-{str_date}.nc"
    path = os.path.join(base_dir, yyyy, mm, dd, filename)

    if utils.file_exists(path) and verbose:
        warnings.warn(f"{path} already exists and will be overwritten.")
    else:
        from pathlib import Path
        Path(base_dir, yyyy, mm, dd).mkdir(parents=True, exist_ok=True)

    # creates a copy od original dataset -> writes only necessary data
    ds_towrite = copy.deepcopy(ds)

    # for the mass concentration, we just need the emc.
    emc = {}
    for data_var in list(ds_towrite.data_vars):
        if 'mass_concentration:' in data_var:
            emc[data_var.split(':')[1]] = ds[data_var].emc
            ds_towrite = ds_towrite.drop(data_var)

    # add emc as new dataarray
    ds_towrite["emc"] = xr.DataArray(
        data=list(emc.values()),
        dims=["aer_type"],
        coords=dict(
            aer_type=list(emc.keys()),
        ),
        attrs=dict(long_name="Extinction to Mass Coefficient", units='m2.g-1', wavelength=ds.l0_wavelength.data),
    )
    # add attributes to aer_type
    ds_towrite["aer_type"] = ds_towrite["aer_type"].assign_attrs({
        'long_name': 'Aerosol type'
    })

    # determines the scene classification for each profile
    scene = _classify_scene(ds_towrite)
    # add scene as new dataarray
    ds_towrite["scene"] = ("time", scene)
    ds_towrite["scene"] = ds_towrite["scene"].assign_attrs({
        'long_name': "Scene classification",
        'definition': '0: aer (aerosols only) / 1: high_cloud (base cloud above 6096 m) / 2: mid_cloud (base cloud between 1981 m and 6096 m) / 3: low-cloud (base cloud below 1981 m) - 4: foc (fog or condensation)'
    })

    # add scene for extinction profile: cloud_above, cloud_below
    retrieval_scene = _classify_retrieval_scene(ds_towrite)
    # add scene as new dataarray
    ds_towrite["retrieval_scene"] = ("time", retrieval_scene)
    ds_towrite["retrieval_scene"] = ds_towrite["retrieval_scene"].assign_attrs({
        'long_name': "Retrieval scene classification",
        'definition': '0: aer (aerosols only) / 1: cloud_above (cloud above reference altitude) / 3: cloud_below (cloud below reference point) / 4: foc (fog or condensation)'
    })

    # drop other variables
    drop_variables = ['start_time', 'cloud_base_height', 'vertical_visibility', 'cbh_uncertainties', 'uncertainties_att_backscatter_0']
    for drop_var in drop_variables:
        ds_towrite = ds_towrite.drop(drop_var)

    # some variables have no dimension. Set it as attribute and drop the variable.
    nodim_variables = ['l0_wavelength', 'station_latitude', 'station_longitude', 'station_altitude']
    for nodim_var in nodim_variables:
        ds_towrite.attrs[nodim_var] = ds_towrite[nodim_var].data
        ds_towrite = ds_towrite.drop(nodim_var)

    # add altitude direction
    ds_towrite["altitude"] = ds_towrite["altitude"].assign_attrs({
        'positive': "up"
    })

    # encoding with compression
    encoding = {}
    for varname, var in ds_towrite.variables.items():
        if varname == "time": continue
        if var.dtype == np.int64:
            encoding[varname] = {"dtype": np.int32, "zlib": True, "chunksizes": var.shape}
        if varname in ["extinction", "clouds_bases", "clouds_peaks", "clouds_tops"]:
            encoding[varname] = {"zlib": True, "chunksizes": var.shape}

    # convert also the quality_flag's variable flag_values attribute also to NC_INT instead of NC_INT64
    ds_towrite["quality_flag"] = ds_towrite.quality_flag.assign_attrs({'flag_values': np.array([0,1,2], dtype=np.int32)})

    # writes to netcdf
    ds_towrite.to_netcdf(path, mode='w', encoding=encoding)