28 #ifndef __DATA_STORE_CONDUIT_HPP__ 29 #define __DATA_STORE_CONDUIT_HPP__ 31 #include "lbann_config.hpp" 33 #include "conduit/conduit_node.hpp" 38 #include <unordered_map> 39 #include <unordered_set> 45 #define LBANN_SAMPLE_ID_PAD 9 46 #define LBANN_DATA_ID_STR(data_id) \ 47 lbann::pad(std::to_string(data_id), LBANN_SAMPLE_ID_PAD, '0') 49 class generic_data_reader;
54 template <
class T1,
class T2>
55 std::size_t
operator()(
const std::pair<T1, T2>& pair)
const 57 return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
67 using map_is_t = std::unordered_map<int, size_t>;
74 using map_ss_t = std::unordered_map<size_t, size_t>;
96 void set_shuffled_indices(
const std::vector<int>* indices);
99 size_t get_num_global_indices()
const;
101 void setup(
int mini_batch_size);
105 const std::string sample_list_file,
110 const conduit::Node& get_conduit_node(
int data_id)
const;
117 void set_conduit_node(
int data_id,
118 const conduit::Node& node,
119 bool already_have =
false);
121 void set_preloaded_conduit_node(
int data_id,
const conduit::Node& node);
123 void spill_preloaded_conduit_node(
int data_id,
const conduit::Node& node);
125 const conduit::Node& get_random_node()
const;
127 const conduit::Node& get_random_node(
const std::string& field)
const;
130 conduit::Node& get_empty_node(
int data_id);
156 bool is_fully_loaded()
const;
169 void set_is_preloading(
bool flag);
172 void set_is_explicitly_loading(
bool flag);
185 void set_loading_is_complete();
192 void check_query_flags()
const;
200 void exchange_owner_maps();
203 void build_preloaded_owner_map(
const std::vector<int>& per_rank_list_sizes);
208 for (
auto&& i : owner) {
209 m_owner[std::make_pair(i.first, m_offset_in_partition)] = i.second;
215 void clear_owner_map();
220 m_owner[std::make_pair(i.first, m_offset_in_partition)] = i.second;
228 m_owner[std::make_pair(data_id, m_offset_in_partition)] = owner;
237 void compact_nodes();
241 int get_index_owner(
int idx);
248 void preload_local_cache();
250 void start_exchange_mini_batch_data(
size_t current_pos,
size_t mb_size);
251 void finish_exchange_mini_batch_data();
255 bool has_conduit_node(
int data_id)
const;
260 std::ofstream* m_debug =
nullptr;
261 std::ofstream* m_profile =
nullptr;
266 std::lock_guard<std::mutex> lock(m_mutex);
267 return m_data.size();
278 void flush_debug_file();
285 void flush_profile_file()
const;
288 void write_checkpoint(std::string dir_name);
291 void load_checkpoint(std::string dir_name,
295 void set_profile_msg(std::string);
307 bool test_local_cache_imagenet(
int n);
309 void test_imagenet_node(
int sample_id,
bool dereference =
true);
311 size_t get_mem_usage();
314 bool m_bcast_sample_size =
true;
321 bool m_owner_maps_were_exchanged =
false;
323 bool m_run_checkpoint_test =
false;
326 size_t m_my_num_indices = 0;
329 bool m_spill =
false;
332 bool m_is_spilled =
false;
341 int m_cur_spill_dir_integer = -1;
376 size_t m_mem_seg_length = 0;
379 const std::string m_debug_filename_base =
"debug";
382 const std::string m_profile_filename_base =
"data_store_profile";
385 bool m_was_loaded_from_file =
false;
386 const std::string m_cereal_fn =
"data_store_cereal";
391 const int m_max_files_per_directory = 500;
398 double m_exchange_sample_sizes_time = 0;
401 double m_start_snd_rcv_time = 0;
404 double m_wait_all_time = 0;
407 double m_rebuild_time = 0;
410 double m_exchange_time = 0;
422 bool m_is_setup =
false;
425 bool m_loading_is_complete =
false;
428 bool m_preloading =
false;
436 bool m_explicitly_loading =
false;
442 int m_owner_map_mb_size = 0;
445 int m_compacted_sample_size = 0;
447 bool m_is_local_cache =
false;
449 bool m_node_sizes_vary =
false;
452 bool m_have_sample_sizes =
false;
462 int m_rank_in_world = -1;
471 bool m_mini_batch_data_exchange_started =
false;
490 mutable std::unordered_map<int, conduit::Node>
m_data;
541 void start_exchange_data_by_sample(
size_t current_pos,
size_t mb_size);
542 void finish_exchange_data_by_sample();
544 void setup_data_store_buffers();
547 void build_node_for_sending(
const conduit::Node& node_in,
548 conduit::Node& node_out);
551 void exchange_sample_sizes();
555 int build_indices_i_will_send(
int current_pos,
int mb_size);
559 int build_indices_i_will_recv(
int current_pos,
int mb_size);
561 void error_check_compacted_node(
const conduit::Node& nd,
int data_id);
564 void exchange_local_caches();
569 void get_image_sizes(
map_is_t& sizes, std::vector<std::vector<int>>& indices);
572 void allocate_shared_segment(
map_is_t& sizes,
573 std::vector<std::vector<int>>& indices);
576 void read_files(std::vector<char>& work,
578 std::vector<int>& indices);
581 void compute_image_offsets(
map_is_t& image_sizes,
582 std::vector<std::vector<int>>& indices);
585 void exchange_images(std::vector<char>& work,
587 std::vector<std::vector<int>>& indices);
589 void build_conduit_nodes(
map_is_t& sizes);
592 void fillin_shared_images(
char* images,
size_t size,
size_t offset);
600 void test_checkpoint(
const std::string&);
603 void print_variables();
610 void print_partial_owner_map(
int n);
612 std::string get_conduit_dir()
const;
613 std::string get_cereal_fn()
const;
614 std::string get_metadata_fn()
const;
617 void make_dir_if_it_doesnt_exist(
const std::string& dir);
620 void spill_conduit_node(
const conduit::Node& node,
int data_id);
623 void load_spilled_conduit_nodes();
630 void setup_spill(std::string dir);
646 void open_informational_files();
649 void open_next_conduit_spill_directory();
653 void profile_timing();
655 void setup_checkpoint_test();
657 std::string get_lassen_spill_dir();
659 void verify_sample_size();
670 (*m_profile) << std::endl;
671 flush_profile_file();
674 template <
typename T,
typename... Types>
677 if (!m_world_master) {
683 (*m_profile) << var1 <<
" ";
685 flush_profile_file();
693 (*m_debug) << std::endl;
697 template <
typename T,
typename... Types>
703 (*m_debug) << var1 <<
" ";
711 #endif // __DATA_STORE_JAG_HPP__ bool is_explicitly_loading() const
Returns true if explicitly loading is turned on.
const std::vector< int > * m_shuffled_indices
convenience handle
std::vector< size_t > m_outgoing_msg_sizes
bool is_preloading() const
Returns true if preloading is turned on.
int m_partition_in_trainer
std::unordered_map< size_t, size_t > map_ss_t
std::vector< conduit::Node > m_recv_buffer
std::unordered_map< int, int > map_ii_t
std::vector< std::unordered_set< int > > m_indices_to_send
std::vector< El::mpi::Request< El::byte > > m_send_requests
map_is_t m_sample_sizes
Maps a data_id to its image size.
std::string m_spill_dir_base
Base directory for spilling (offloading) conduit nodes.
bool is_local_cache() const
Returns "true" is running in local cache mode.
int m_num_files_in_cur_spill_dir
Contains the number of conduit nodes that have been written to m_cur_dir.
std::unordered_map< int, size_t > map_is_t
std::mutex m_mutex
used in set_conduit_node(...)
void set_finished_building_map()
Special handling for ras_lipid_conduit_data_reader; may go away in the future.
void add_owner(int data_id, int owner)
Special handling for ras_lipid_conduit_data_reader; may go away in the future.
std::vector< conduit::Node > m_send_buffer
work space; used in exchange_data
std::vector< El::mpi::Request< El::byte > > m_recv_requests
bool m_world_master
convenience handles
map_pssi_t m_owner
Maps an index to the processor that owns the associated data First value of index is the sample ID an...
generic_data_reader * m_reader
int m_num_partitions_in_trainer
map_ii_t m_recv_sample_sizes
std::vector< std::unordered_set< int > > m_indices_to_recv
std::string m_test_dir
The directory to use for testing checkpointing.
void set_node_sizes_vary()
std::vector< size_t > m_incoming_msg_sizes
std::vector< int > m_recv_data_ids
Contains the list of data IDs that will be received.
std::unordered_map< int, conduit::Node > m_data
Contains the conduit nodes that are "owned" by this rank.
int m_np_in_trainer
number of procs in the trainer; convenience handle
std::string m_cur_spill_dir
Current directory for spilling (writing to file) conduit nodes.
std::unordered_map< int, conduit::Node > m_minibatch_data
std::unordered_map< std::pair< size_t, size_t >, int, size_t_pair_hash > map_pssi_t
int m_offset_in_partition
map_is_t m_image_offsets
Maps a data_id to the image location in a shared memory segment.
std::size_t operator()(const std::pair< T1, T2 > &pair) const
void set_preloaded_owner_map(const std::unordered_map< int, int > &owner)
fills in m_owner, which maps index -> owning processor
data_store_conduit * copy() const
std::string m_profile_filename
void PROFILE(T var1, Types... var2) const
map_ii_t m_spilled_nodes
maps data_id to m_m_cur_spill_dir_integer.
void DEBUG_DS(T var1, Types... var2)
int get_data_size()
for use during development and debugging
std::unordered_map< int, conduit::Node > m_data_cache
Contains a cache of the conduit nodes that are "owned" by this rank.
void set_is_local_cache(bool flag=true)
turns local cache mode on of off
void set_owner_map(const std::unordered_map< int, int > &m)
std::vector< conduit::Node > m_send_buffer_2
std::string m_debug_filename