Skip to contents

Base for custom workflow

Here is a workflow example that combines all the functions in FLAREr to generate a forecast. You can modify this example to customize your implimentation of FLARE

tmp <- tempdir()
file.copy(system.file("extdata", package = "FLAREr"), tmp, recursive = TRUE)
lake_directory <- file.path(tmp, "extdata")
configure_run_file <- "configure_run.yml"
config_set_name <- "default"

Flare

FLAREr::ignore_sigpipe()

config <- FLAREr::set_configuration(configure_run_file, lake_directory, 
                                    config_set_name = config_set_name)

config <- FLAREr::get_restart_file(config, lake_directory)

message(paste0("     Running forecast that starts on: ", config$run_config$start_datetime))

if(!is.null(config$model_settings$par_config_file)){
  if(!is.na(config$model_settings$par_config_file)){
    pars_config <- readr::read_csv(file.path(config$file_path$configuration_directory,
                                             config$model_settings$par_config_file), 
                                   col_types = readr::cols())
  }
}



obs_config <- readr::read_csv(file.path(config$file_path$configuration_directory, 
                                        config$model_settings$obs_config_file), 
                              col_types = readr::cols())
states_config <- readr::read_csv(file.path(config$file_path$configuration_directory,
                                           config$model_settings$states_config_file), 
                                 col_types = readr::cols())

if(!config$met$use_observed_met){
  obs_met_file = NULL
}else{
  obs_met_file <- file.path(config$file_path$qaqc_data_directory, 
                            config$met$observed_filename)
}

met_start_datetime <- lubridate::as_datetime(config$run_config$start_datetime)
met_forecast_start_datetime <- lubridate::as_datetime(config$run_config$forecast_start_datetime)

if(config$run_config$forecast_horizon > 16 & config$met$use_forecasted_met){
  met_forecast_start_datetime <- met_forecast_start_datetime - lubridate::days(config$met$forecast_lag_days)
  if(met_forecast_start_datetime < met_start_datetime){
    met_start_datetime <- met_forecast_start_datetime
  }
}

if(is.null(config$met$use_met_s3)){
  config$met$use_met_s3 <- TRUE
}

met_out <- FLAREr::generate_met_files_arrow(obs_met_file = obs_met_file,
                                            out_dir = config$file_path$execute_directory,
                                            start_datetime = met_start_datetime,
                                            end_datetime = config$run_config$end_datetime,
                                            forecast_start_datetime = met_forecast_start_datetime,
                                            forecast_horizon =  config$run_config$forecast_horizon,
                                            site_id = config$location$site_id,
                                            use_s3 = config$met$use_met_s3,
                                            bucket = config$s3$drivers$bucket,
                                            endpoint = config$s3$drivers$endpoint,
                                            local_directory = file.path(lake_directory,config$met$local_directory),
                                            use_forecast = config$met$use_forecasted_met,
                                            use_ler_vars = config$met$use_ler_vars,
                                            use_siteid_s3 = TRUE)

if(is.null(config$inflow$use_inflow_s3)){
  config$inflow$use_inflow_s3 <- TRUE
}

if(config$inflow$include_inflow){
  if(config$run_config$forecast_horizon > 0){
    inflow_forecast_dir <- file.path(config$inflow$forecast_inflow_model, config$location$site_id, "0", lubridate::as_date(config$run_config$forecast_start_datetime))
  }else{
    inflow_forecast_dir <- NULL
  }
  
  variables <- c("time", "FLOW", "TEMP", "SALT")

  inflow_outflow_files <- FLAREr::create_inflow_outflow_files_arrow(inflow_forecast_dir = inflow_forecast_dir,
                                                                    inflow_obs = file.path(lake_directory, "targets",config$location$site_id, config$inflow$observed_filename),
                                                                    variables = variables,
                                                                    out_dir = config$file_path$execute_directory,
                                                                    start_datetime = config$run_config$start_datetime,
                                                                    end_datetime = config$run_config$end_datetime,
                                                                    forecast_start_datetime = config$run_config$forecast_start_datetime,
                                                                    forecast_horizon =  config$run_config$forecast_horizon,
                                                                    site_id = config$location$site_id,
                                                                    use_s3 = config$run_config$use_s3,
                                                                    bucket = config$s3$inflow_drivers$bucket,
                                                                    endpoint = config$s3$inflow_drivers$endpoint,
                                                                    local_directory = file.path(lake_directory, config$inflow$local_directory, inflow_forecast_dir),
                                                                    use_forecast = config$inflow$use_forecasted_inflow,
                                                                    use_ler_vars = config$inflow$use_ler_vars)
  
}else{
  inflow_outflow_files <- list()
  inflow_outflow_files$inflow_file_name <- NULL
  inflow_outflow_files$outflow_file_name <- NULL
}

obs_insitu_file <- file.path(config$file_path$qaqc_data_directory, config$da_setup$obs_filename)

obs <- FLAREr::create_obs_matrix(cleaned_observations_file_long = obs_insitu_file,
                                 obs_config = obs_config,
                                 config)

states_config <- FLAREr::generate_states_to_obs_mapping(states_config, obs_config)

model_sd <- FLAREr::initiate_model_error(config, states_config)

init <- FLAREr::generate_initial_conditions(states_config,
                                            obs_config,
                                            pars_config,
                                            obs,
                                            config,
                                            historical_met_error = met_out$historical_met_error)
#Run EnKF
da_forecast_output <- FLAREr::run_da_forecast(states_init = init$states,
                                              pars_init = init$pars,
                                              aux_states_init = init$aux_states_init,
                                              obs = obs,
                                              obs_sd = obs_config$obs_sd,
                                              model_sd = model_sd,
                                              working_directory = config$file_path$execute_directory,
                                              met_file_names = met_out$filenames,
                                              inflow_file_names = inflow_outflow_files$inflow_file_name,
                                              outflow_file_names = inflow_outflow_files$outflow_file_name,
                                              config = config,
                                              pars_config = pars_config,
                                              states_config = states_config,
                                              obs_config = obs_config,
                                              management = NULL,
                                              da_method = config$da_setup$da_method,
                                              par_fit_method = config$da_setup$par_fit_method,
                                              debug = FALSE,
                                              log_wq = FALSE,
                                              obs_secchi = NULL,
                                              obs_depth = NULL)

message("Writing netcdf")
saved_file <- FLAREr::write_forecast_netcdf(da_forecast_output = da_forecast_output,
                                            forecast_output_directory = config$file_path$forecast_output_directory,
                                            use_short_filename = TRUE)

message("Writing arrow forecast")
forecast_df <- FLAREr::write_forecast_arrow(da_forecast_output = da_forecast_output,
                                            use_s3 = config$run_config$use_s3,
                                            bucket = config$s3$forecasts_parquet$bucket,
                                            endpoint = config$s3$forecasts_parquet$endpoint,
                                            local_directory = file.path(lake_directory, "forecasts/parquet"))

message("Scoring forecasts")
if(config$output_settings$evaluate_past){
  reference_datetime_format <- "%Y-%m-%d %H:%M:%S"
  past_days <- strftime(lubridate::as_datetime(forecast_df$reference_datetime[1]) - lubridate::days(config$run_config$forecast_horizon), tz = "UTC")
  
  vars <- FLAREr:::arrow_env_vars()
  s3 <- arrow::s3_bucket(bucket = config$s3$forecasts_parquet$bucket, endpoint_override = config$s3$forecasts_parquet$endpoint)
  past_forecasts <- arrow::open_dataset(s3) |>
    dplyr::filter(model_id == forecast_df$model_id[1],
                  site_id == forecast_df$site_id[1],
                  reference_datetime > past_days) |>
    dplyr::collect()
  FLAREr:::unset_arrow_vars(vars)
}else{
  past_forecasts <- NULL
}

combined_forecasts <- dplyr::bind_rows(forecast_df, past_forecasts)

FLAREr::generate_forecast_score_arrow(targets_file = obs_insitu_file,
                                      forecast_df = combined_forecasts,
                                      use_s3 = config$run_config$use_s3,
                                      bucket = config$s3$scores$bucket,
                                      endpoint = config$s3$scores$endpoint,
                                      local_directory = file.path(lake_directory, "scores/parquet"),
                                      variable_types = config$output_settings$variables_in_scores)

message("Generating plot")
FLAREr::plotting_general_2(file_name = saved_file,
                           target_file = obs_insitu_file,
                           ncore = 2,
                           obs_csv = FALSE)

message("Putting forecast")
FLAREr::put_forecast(saved_file, eml_file_name = NULL, config)

message(paste0("successfully generated flare forecats for: ", basename(saved_file)))