CALCUL AND SAVE SST STATS

[2]:
import numpy as N
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
import sys
import dask
from os import path
from dask.distributed import Client, LocalCluster, progress
import time
from tools import my_percentile
from scipy.optimize import least_squares,curve_fit
import dask_hpcconfig
from dask_jobqueue import PBSCluster
import glob

conseils dask

pour Datarmor il vaut mieux avoir 1 thread/worker il vaut mieux avoir des chunks assez fa

[3]:
#pour l'ensemble des pas de temps
#cluster = PBSCluster(processes=3,cores=9) #16min 44
cluster = PBSCluster(processes=3,cores=3) #16min 44

cluster.scale(jobs=4)
print(cluster.dashboard_link)
/home1/datahome/mcaillau/conda-env/pydask3/lib/python3.10/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
/home1/datahome/mcaillau/conda-env/pydask3/lib/python3.10/site-packages/dask_jobqueue/pbs.py:82: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
http://10.148.1.73:8787/status
/home1/datahome/mcaillau/conda-env/pydask3/lib/python3.10/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
/home1/datahome/mcaillau/conda-env/pydask3/lib/python3.10/site-packages/dask_jobqueue/pbs.py:82: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
[4]:
# explicitly connect to the cluster we just created
client = Client(cluster)
[5]:
#read grid
ds_grid=xr.open_dataset('/home/shom_simuref/CROCO/ODC/CONFIGS/MEDITERRANEE_GLOBALE/CROCO_FILES/test2.nc')
[6]:
path="/home/shom_simuref/CROCO/ODC/SIMU-RESULT/HINDCAST_2012_2013/OUTPUTS_201207_201307/"
[7]:
list_model=glob.glob(path+"croco_his_surf_2*.nc")
[8]:
chunks={'time':121,'xi_rho':315,'eta_rho':878}
chunks={'time':800,'s_rho':-1,'xi_rho':865,'eta_rho':936}
date_start="2012-06-01"
date_end="2013-08-01"
time_range=slice(date_start,date_end)
[9]:
#store data dir
stat_dir='/home/shom_simuref/CROCO/ODC/POSTPROC/SST/'
[10]:
ds=xr.open_mfdataset(list_model, parallel=False,chunks="auto",
                        concat_dim="time", combine="nested",
                        data_vars='minimal', coords='minimal', compat='override')
#                        preprocess=partial_func,data_vars='minimal', coords='minimal', compat='override')

print('data size in GB {:0.2f}\n'.format(ds.nbytes / 1e9))

data size in GB 425.54

[11]:
#ds=ds.isel(time=slice(0,-1,12))
[12]:
#because of MPINOLAND
for var in ["xi_rho","eta_rho","xi_u","eta_v","lon_rho","lat_rho","lon_u","lat_v"]:
    ds[var]=ds_grid[var]
[13]:
#remove duplicates
_,index=N.unique(ds.time,return_index=True)
ds=ds.isel(time=index)
[14]:
#select time range
ds=ds.sel(time=time_range)
[15]:
ds=ds.chunk(chunks)
ds.temp
[15]:
<xarray.DataArray 'temp' (time: 8760, eta_rho: 936, xi_rho: 2595)>
dask.array<rechunk-merge, shape=(8760, 936, 2595), dtype=float32, chunksize=(800, 936, 865), chunktype=numpy.ndarray>
Coordinates:
  * xi_rho   (xi_rho) int64 0 1 2 3 4 5 6 ... 2588 2589 2590 2591 2592 2593 2594
  * eta_rho  (eta_rho) int64 0 1 2 3 4 5 6 7 ... 928 929 930 931 932 933 934 935
  * time     (time) datetime64[ns] 2012-07-01T13:00:00 ... 2013-07-01T12:00:00
    lon_rho  (eta_rho, xi_rho) float64 dask.array<chunksize=(936, 865), meta=np.ndarray>
    lat_rho  (eta_rho, xi_rho) float64 dask.array<chunksize=(936, 865), meta=np.ndarray>
Attributes:
    long_name:  SST
    units:      Celsius
    field:      
[16]:
coord_dict={"xi_rho":"X","eta_rho":"Y","xi_u":"X_U","eta_v":"Y_V"}
ds=ds.assign_coords({"X":ds_grid.lon_rho[0,:], "Y":ds_grid.lat_rho[:,0]})
ds2=ds.swap_dims(coord_dict)
[17]:
sst=ds2.temp
[18]:
sst
[18]:
<xarray.DataArray 'temp' (time: 8760, Y: 936, X: 2595)>
dask.array<rechunk-merge, shape=(8760, 936, 2595), dtype=float32, chunksize=(800, 936, 865), chunktype=numpy.ndarray>
Coordinates:
    xi_rho   (X) int64 0 1 2 3 4 5 6 7 ... 2588 2589 2590 2591 2592 2593 2594
    eta_rho  (Y) int64 0 1 2 3 4 5 6 7 8 ... 927 928 929 930 931 932 933 934 935
  * time     (time) datetime64[ns] 2012-07-01T13:00:00 ... 2013-07-01T12:00:00
    lon_rho  (Y, X) float64 dask.array<chunksize=(936, 865), meta=np.ndarray>
    lat_rho  (Y, X) float64 dask.array<chunksize=(936, 865), meta=np.ndarray>
  * X        (X) float64 -7.0 -6.983 -6.967 -6.95 ... 36.18 36.2 36.22 36.23
  * Y        (Y) float64 30.23 30.25 30.27 30.28 30.3 ... 45.77 45.78 45.8 45.82
Attributes:
    long_name:  SST
    units:      Celsius
    field:      

MONTHLY MEAN

[79]:
%%time
#4min12s on 4 workers and 3 proc par worker
sst=ds2.temp
print('compute mean')
sst_mean = sst.resample(time="1MS").mean().compute()
sst_mean=sst_mean.assign_attrs({"start_date":date_start,"end_date":date_end})
#write results for faster treatment
sst_mean.to_netcdf(f'{stat_dir}/mean_sst_croco.nc',mode='w')
compute mean
CPU times: user 18.9 s, sys: 2.13 s, total: 21.1 s
Wall time: 4min 12s

DAILY MEAN

[19]:
%%time
#4min12s on 4 workers and 3 proc par worker
sst=ds2.temp
print('compute daily mean')
sst_daily_mean = sst.resample(time="1D").mean().compute()
sst_daily_mean=sst_daily_mean.assign_attrs({"start_date":date_start,"end_date":date_end})
#write results for faster treatment
sst_daily_mean.to_netcdf(f'{stat_dir}/mean_daily_sst_croco.nc',mode='w')
compute daily mean
CPU times: user 39.2 s, sys: 9.66 s, total: 48.8 s
Wall time: 6min 31s

PERCENTILES

[19]:
from tools import my_percentile

re chunks to percentile

[20]:
#sst=sst.chunk({'time':-1,'Y':52,'X':173})
[21]:
#%%time
#per95=my_percentile(sst,0.95,"time")
#per95=per95.compute()
#per95.to_netcdf(f'{stat_dir}/sst_percentile_95.nc',mode='w')
[22]:
#%%time
#per5=my_percentile(sst,0.05,"time")
#per5=per5.compute()
#per5.to_netcdf(f'{stat_dir}/sst_percentile_5.nc',mode='w')

SEVIRI

[20]:
seviri_path="/home/shom_simuref/CROCO/ODC/DATA/SST_SEVIRI/"
[21]:
chunks_sevi={'time':444,"lon":-1,"lat":-1}

[22]:
ds_sevi=xr.open_mfdataset(glob.glob(seviri_path+'201[2-3]*.nc'),chunks=chunks_sevi,
                         combine="by_coords",
                          data_vars='minimal', coords='minimal', compat='override')

reduce to croco time bnds

[23]:
time_bnds=slice(ds.time[0],ds.time[-1])
[24]:
ds_sevi=ds_sevi.sel(time=time_bnds)
[25]:
sst_sevi=ds_sevi.sea_surface_temperature-273.15

monthly mean

[26]:
sst_sevi_mean=sst_sevi.resample(time='1MS').mean()
[28]:
%%time
sst_sevi_mean=sst_sevi_mean.compute()
sst_sevi=sst_sevi.assign_attrs({"start_date":date_start,"end_date":date_end})
sst_sevi_mean.to_netcdf(f'{stat_dir}/mean_sst_sevi.nc',mode="w")
CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 59.5 ms

daily mean

[29]:
sst_sevi_daily_mean=sst_sevi.resample(time='1D').mean()
sst_sevi_daily_mean=sst_sevi_daily_mean.assign_attrs({"start_date":date_start,"end_date":date_end})
sst_sevi_daily_mean.to_netcdf(f'{stat_dir}/mean_daily_sst_sevi.nc',mode="w")

percentiles

re chunk for PERCENTILE

[31]:
sst_sevi=sst_sevi.chunk({'time':-1,"lat":-1,"lon":-1})
[32]:
sst_sevi2=sst_sevi.interpolate_na(method="linear",limit=None,max_gap=None,dim="time")
[33]:
sst_sevi2=sst_sevi2.ffill('time')
sst_sevi2=sst_sevi2.bfill('time')

[34]:
#%%time
##35 sec sur 4 jobs de 6 processes et 12 cores ( 2 threads/worker)
#sst_sevi_p95=my_percentile(sst_sevi2,0.95,"time").compute()
#sst_sevi_p95.to_netcdf(f'{stat_dir}/sst_sevi_p95.nc',mode="w")
[35]:
#%%time
#sst_sevi_p5=my_percentile(sst_sevi2,0.05,"time").compute()
#sst_sevi_p5.to_netcdf(f'{stat_dir}/sst_sevi_p5.nc',mode="w")

RMSE CROCO-SEVIRI

First compute seviri

[36]:
%%time
#1min 21s sur 4 workers avec 3 procs par worker
sst_sevi2=sst_sevi2.compute()
CPU times: user 5.1 s, sys: 3.95 s, total: 9.05 s
Wall time: 1min 21s

interp croco on seviri horizontal grid

[37]:
sst_croco=sst.interp(X=sst_sevi.lon,Y=sst_sevi.lat)
CPU times: user 36 ms, sys: 0 ns, total: 36 ms
Wall time: 115 ms
[38]:
sst_croco=sst_croco.chunk({'time':2000})
[39]:
%%time
#6 min 23s sur 4 workers avec 3 procs par worker
sst_croco=sst_croco.compute()
CPU times: user 41.7 s, sys: 35.6 s, total: 1min 17s
Wall time: 6min 23s

interp croco on seviri time grid

[40]:
%%time
#40 s sur 4 workers avec 3 procs par worker
sst_croco2=sst_croco.interp(time=sst_sevi2.time).compute()

CPU times: user 16.2 s, sys: 25.8 s, total: 42 s
Wall time: 40.1 s

Compute the RMSE

[47]:
import xskillscore as xs

calcul of monthly rmse

[48]:
ds_merge=xr.merge([sst_croco2.rename('croco'),sst_sevi2.rename('seviri')])
[56]:
#custom function to use with resample in xarray
def calc_rmse(data):
    return xs.rmse(data.croco,data.seviri,dim="time",skipna=True)
[50]:
%%time
#22s sur 4 workers avec 3 procs par worker
rmse_month=ds_merge.resample(time='1MS').apply(calc_rmse).compute()
CPU times: user 13.1 s, sys: 9.64 s, total: 22.7 s
Wall time: 22 s
[65]:
rmse_month=rmse_month.assign_attrs({"start_date":date_start,"end_date":date_end})
rmse_month=rmse_month.rename('rmse')
rmse_month.to_netcdf(f'{stat_dir}/rmse_croco_sevi_monthly.nc',mode='w')

calcul of annual rmse

[64]:
%%time
#31s sur 4 workers avec 3 procs par worker
rmse=xs.rmse(sst_croco2,sst_sevi2,dim="time",skipna=True)
rmse=rmse.compute()
rmse=rmse.rename('rmse')
rmse=rmse.assign_attrs({"start_date":date_start,"end_date":date_end})
rmse.to_netcdf(f'{stat_dir}/rmse_croco_sevi_annual.nc',mode='w')

CPU times: user 23.8 s, sys: 9.01 s, total: 32.8 s
Wall time: 31.8 s
[61]:

[ ]: