LBANN  0.103.0
LivermoreBigArtificialNeuralNetworkToolkit
data_store_conduit.hpp
Go to the documentation of this file.
1 // Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC.
3 // Produced at the Lawrence Livermore National Laboratory.
4 // Written by the LBANN Research Team (B. Van Essen, et al.) listed in
5 // the CONTRIBUTORS file. <lbann-dev@llnl.gov>
6 //
7 // LLNL-CODE-697807.
8 // All rights reserved.
9 //
10 // This file is part of LBANN: Livermore Big Artificial Neural Network
11 // Toolkit. For details, see http://software.llnl.gov/LBANN or
12 // https://github.com/LLNL/LBANN.
13 //
14 // Licensed under the Apache License, Version 2.0 (the "Licensee"); you
15 // may not use this file except in compliance with the License. You may
16 // obtain a copy of the License at:
17 //
18 // http://www.apache.org/licenses/LICENSE-2.0
19 //
20 // Unless required by applicable law or agreed to in writing, software
21 // distributed under the License is distributed on an "AS IS" BASIS,
22 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
23 // implied. See the License for the specific language governing
24 // permissions and limitations under the license.
25 //
27 
28 #ifndef __DATA_STORE_CONDUIT_HPP__
29 #define __DATA_STORE_CONDUIT_HPP__
30 
31 #include "lbann_config.hpp"
32 
33 #include "conduit/conduit_node.hpp"
34 #include "lbann/base.hpp"
35 #include "lbann/comm.hpp"
37 #include <mutex>
38 #include <unordered_map>
39 #include <unordered_set>
40 
41 namespace lbann {
42 
43 // support for encoding data_id in conduit::Node, used by
44 // conduit_data_store and associated code
45 #define LBANN_SAMPLE_ID_PAD 9
46 #define LBANN_DATA_ID_STR(data_id) \
47  lbann::pad(std::to_string(data_id), LBANN_SAMPLE_ID_PAD, '0')
48 
49 class generic_data_reader;
50 
53 {
54  template <class T1, class T2>
55  std::size_t operator()(const std::pair<T1, T2>& pair) const
56  {
57  return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
58  }
59 };
60 
62 {
63 
64 public:
65  // need to quickly change from unordered_map to map for debugging
66  using map_ii_t = std::unordered_map<int, int>;
67  using map_is_t = std::unordered_map<int, size_t>;
68 
69  // Hash map for tracking the node and hyperslab partition ID
70  using map_pssi_t =
71  std::unordered_map<std::pair<size_t, size_t>, int, size_t_pair_hash>;
72 
73  // not currently used; will be in the future
74  using map_ss_t = std::unordered_map<size_t, size_t>;
75 
78 
81 
83  data_store_conduit(const data_store_conduit&, const std::vector<int>&);
84 
86  data_store_conduit& operator=(const data_store_conduit&);
87 
88  data_store_conduit* copy() const { return new data_store_conduit(*this); }
89 
92 
93  void set_data_reader_ptr(generic_data_reader* reader);
94 
96  void set_shuffled_indices(const std::vector<int>* indices);
97 
99  size_t get_num_global_indices() const;
100 
101  void setup(int mini_batch_size);
102 
103  // TODO FIXME
104  void check_mem_capacity(lbann_comm* comm,
105  const std::string sample_list_file,
106  size_t stride,
107  size_t offset);
108 
110  const conduit::Node& get_conduit_node(int data_id) const;
111 
117  void set_conduit_node(int data_id,
118  const conduit::Node& node,
119  bool already_have = false);
120 
121  void set_preloaded_conduit_node(int data_id, const conduit::Node& node);
122 
123  void spill_preloaded_conduit_node(int data_id, const conduit::Node& node);
124 
125  const conduit::Node& get_random_node() const;
126 
127  const conduit::Node& get_random_node(const std::string& field) const;
128 
130  conduit::Node& get_empty_node(int data_id);
131 
132  //=================================================================
133  // methods for setting and querying the data store's mode
134  //=================================================================
139  bool is_preloading() const { return m_preloading; }
140 
150  bool is_explicitly_loading() const { return m_explicitly_loading; }
151 
156  bool is_fully_loaded() const;
157 
166  bool is_local_cache() const { return m_is_local_cache; }
167 
169  void set_is_preloading(bool flag);
170 
172  void set_is_explicitly_loading(bool flag);
173 
185  void set_loading_is_complete();
186 
188  void set_is_local_cache(bool flag = true) { m_is_local_cache = flag; }
189 
192  void check_query_flags() const;
193 
194  //=================================================================
195  // END methods for setting and querying the data store's mode
196  //=================================================================
197 
198  // XX void { m_owner_maps_were_exchanged = false; }
200  void exchange_owner_maps();
201 
203  void build_preloaded_owner_map(const std::vector<int>& per_rank_list_sizes);
204 
206  void set_preloaded_owner_map(const std::unordered_map<int, int>& owner)
207  {
208  for (auto&& i : owner) {
209  m_owner[std::make_pair(i.first, m_offset_in_partition)] = i.second;
210  }
211  }
212 
215  void clear_owner_map();
216 
217  void set_owner_map(const std::unordered_map<int, int>& m)
218  {
219  for (auto&& i : m) {
220  m_owner[std::make_pair(i.first, m_offset_in_partition)] = i.second;
221  }
222  }
223 
226  void add_owner(int data_id, int owner)
227  {
228  m_owner[std::make_pair(data_id, m_offset_in_partition)] = owner;
229  }
230 
233  void set_finished_building_map() { m_owner_maps_were_exchanged = true; }
234 
237  void compact_nodes();
238 
241  int get_index_owner(int idx);
242 
248  void preload_local_cache();
249 
250  void start_exchange_mini_batch_data(size_t current_pos, size_t mb_size);
251  void finish_exchange_mini_batch_data();
252 
253  void set_node_sizes_vary() { m_node_sizes_vary = true; }
254 
255  bool has_conduit_node(int data_id) const;
256 
260  std::ofstream* m_debug = nullptr;
261  std::ofstream* m_profile = nullptr;
262 
265  {
266  std::lock_guard<std::mutex> lock(m_mutex);
267  return m_data.size();
268  }
269 
271  void copy_members(const data_store_conduit& rhs);
272 
278  void flush_debug_file();
279 
285  void flush_profile_file() const;
286 
288  void write_checkpoint(std::string dir_name);
289 
291  void load_checkpoint(std::string dir_name,
292  generic_data_reader* reader = nullptr);
293 
295  void set_profile_msg(std::string);
296 
307  bool test_local_cache_imagenet(int n);
308 
309  void test_imagenet_node(int sample_id, bool dereference = true);
310 
311  size_t get_mem_usage();
312 
313 private:
314  bool m_bcast_sample_size = true;
315 
316  // if not null, 'm_other' points from a train to a validation
317  // data store; this permits communication which is needed in
318  // special cases (e.g, see: data_reader_npz_ras_lipid.cpp)
319  data_store_conduit* m_other = nullptr;
320 
321  bool m_owner_maps_were_exchanged = false;
322 
323  bool m_run_checkpoint_test = false;
324 
326  size_t m_my_num_indices = 0;
327 
329  bool m_spill = false;
330 
332  bool m_is_spilled = false;
333 
335  std::ofstream m_metadata;
336 
338  std::string m_spill_dir_base;
339 
341  int m_cur_spill_dir_integer = -1;
342 
348  std::string m_cur_spill_dir;
349 
355  std::string m_test_dir;
356 
364 
367 
369  // Guards m_sample_sizes, m_data, m_image_offsets, m_sample_sizes
370  mutable std::mutex m_mutex;
371  // Guards m_compact_sample_size
372  std::mutex m_mutex_2;
373 
375  char* m_mem_seg = 0;
376  size_t m_mem_seg_length = 0;
377  std::string m_seg_name;
378 
379  const std::string m_debug_filename_base = "debug";
380  std::string m_debug_filename;
381 
382  const std::string m_profile_filename_base = "data_store_profile";
383  std::string m_profile_filename;
384 
385  bool m_was_loaded_from_file = false;
386  const std::string m_cereal_fn = "data_store_cereal";
387 
391  const int m_max_files_per_directory = 500;
392 
393  //===========================================================
394  // timers for profiling exchange_data
395  //===========================================================
396 
397  // applicable to imagenet; NA for JAG
398  double m_exchange_sample_sizes_time = 0;
399 
400  // time from beginning of exchange_data_by_sample to wait_all
401  double m_start_snd_rcv_time = 0;
402 
403  // time for wait_all
404  double m_wait_all_time = 0;
405 
406  // time to unpack nodes received from other ranks
407  double m_rebuild_time = 0;
408 
409  // total time for exchange_mini_batch_data
410  double m_exchange_time = 0;
411 
412  // sanity check:
413  // m_start_snd_rcv_time + m_wait_all_time + m_rebuild_time
414  // should be only slightly less than m_exchange_time;
415  // Note that, for imagenet, the first call to exchange_data_by_sample
416  // involves additional communication for exchanging sample sizes
417 
418  //===========================================================
419  // END: timers for profiling exchange_data
420  //===========================================================
421 
422  bool m_is_setup = false;
423 
425  bool m_loading_is_complete = false;
426 
428  bool m_preloading = false;
429 
436  bool m_explicitly_loading = false;
437 
442  int m_owner_map_mb_size = 0;
443 
445  int m_compacted_sample_size = 0;
446 
447  bool m_is_local_cache = false;
448 
449  bool m_node_sizes_vary = false;
450 
452  bool m_have_sample_sizes = false;
453 
455 
456  lbann_comm* m_comm = nullptr;
457 
462  int m_rank_in_world = -1; // -1 for debugging
465 
469 
471  bool m_mini_batch_data_exchange_started = false;
472 
480 
482  const std::vector<int>* m_shuffled_indices;
483 
490  mutable std::unordered_map<int, conduit::Node> m_data;
491 
499  std::unordered_map<int, conduit::Node> m_data_cache;
500 
502  std::vector<int> m_recv_data_ids;
504 
507  std::unordered_map<int, conduit::Node> m_minibatch_data;
508 
510  std::vector<conduit::Node> m_send_buffer;
511  std::vector<conduit::Node> m_send_buffer_2;
512  std::vector<El::mpi::Request<El::byte>> m_send_requests;
513  std::vector<El::mpi::Request<El::byte>> m_recv_requests;
514  std::vector<conduit::Node> m_recv_buffer;
515  std::vector<size_t> m_outgoing_msg_sizes;
516  std::vector<size_t> m_incoming_msg_sizes;
517 
524 
527 
531  std::vector<std::unordered_set<int>> m_indices_to_send;
532 
535  std::vector<std::unordered_set<int>> m_indices_to_recv;
536 
537  //=========================================================================
538  // methods follow
539  //=========================================================================
540 
541  void start_exchange_data_by_sample(size_t current_pos, size_t mb_size);
542  void finish_exchange_data_by_sample();
543 
544  void setup_data_store_buffers();
545 
547  void build_node_for_sending(const conduit::Node& node_in,
548  conduit::Node& node_out);
549 
551  void exchange_sample_sizes();
552 
555  int build_indices_i_will_send(int current_pos, int mb_size);
556 
559  int build_indices_i_will_recv(int current_pos, int mb_size);
560 
561  void error_check_compacted_node(const conduit::Node& nd, int data_id);
562 
564  void exchange_local_caches();
565 
569  void get_image_sizes(map_is_t& sizes, std::vector<std::vector<int>>& indices);
570 
572  void allocate_shared_segment(map_is_t& sizes,
573  std::vector<std::vector<int>>& indices);
574 
576  void read_files(std::vector<char>& work,
577  map_is_t& sizes,
578  std::vector<int>& indices);
579 
581  void compute_image_offsets(map_is_t& image_sizes,
582  std::vector<std::vector<int>>& indices);
583 
585  void exchange_images(std::vector<char>& work,
586  map_is_t& image_sizes,
587  std::vector<std::vector<int>>& indices);
588 
589  void build_conduit_nodes(map_is_t& sizes);
590 
592  void fillin_shared_images(char* images, size_t size, size_t offset);
593 
600  void test_checkpoint(const std::string&);
601 
603  void print_variables();
604 
610  void print_partial_owner_map(int n);
611 
612  std::string get_conduit_dir() const;
613  std::string get_cereal_fn() const;
614  std::string get_metadata_fn() const;
615 
617  void make_dir_if_it_doesnt_exist(const std::string& dir);
618 
620  void spill_conduit_node(const conduit::Node& node, int data_id);
621 
623  void load_spilled_conduit_nodes();
624 
630  void setup_spill(std::string dir);
631 
637  void save_state();
638 
646  void open_informational_files();
647 
649  void open_next_conduit_spill_directory();
650 
653  void profile_timing();
654 
655  void setup_checkpoint_test();
656 
657  std::string get_lassen_spill_dir();
658 
659  void verify_sample_size();
660 
661  //=========================================================================
662  // functions and templates for optional profiling and debug files follow
663  //=========================================================================
664 
665  void PROFILE() const
666  {
667  if (!m_profile) {
668  return;
669  }
670  (*m_profile) << std::endl;
671  flush_profile_file();
672  }
673 
674  template <typename T, typename... Types>
675  void PROFILE(T var1, Types... var2) const
676  {
677  if (!m_world_master) {
678  return;
679  }
680  if (!m_profile) {
681  return;
682  }
683  (*m_profile) << var1 << " ";
684  PROFILE(var2...);
685  flush_profile_file();
686  }
687 
688  void DEBUG_DS()
689  {
690  if (!m_debug) {
691  return;
692  }
693  (*m_debug) << std::endl;
694  flush_debug_file();
695  }
696 
697  template <typename T, typename... Types>
698  void DEBUG_DS(T var1, Types... var2)
699  {
700  if (!m_debug) {
701  return;
702  }
703  (*m_debug) << var1 << " ";
704  DEBUG_DS(var2...);
705  flush_debug_file();
706  }
707 };
708 
709 } // namespace lbann
710 
711 #endif // __DATA_STORE_JAG_HPP__
bool is_explicitly_loading() const
Returns true if explicitly loading is turned on.
const std::vector< int > * m_shuffled_indices
convenience handle
std::vector< size_t > m_outgoing_msg_sizes
bool is_preloading() const
Returns true if preloading is turned on.
std::unordered_map< size_t, size_t > map_ss_t
std::vector< conduit::Node > m_recv_buffer
std::unordered_map< int, int > map_ii_t
std::vector< std::unordered_set< int > > m_indices_to_send
std::vector< El::mpi::Request< El::byte > > m_send_requests
map_is_t m_sample_sizes
Maps a data_id to its image size.
std::string m_spill_dir_base
Base directory for spilling (offloading) conduit nodes.
bool is_local_cache() const
Returns "true" is running in local cache mode.
int m_num_files_in_cur_spill_dir
Contains the number of conduit nodes that have been written to m_cur_dir.
std::unordered_map< int, size_t > map_is_t
std::mutex m_mutex
used in set_conduit_node(...)
void set_finished_building_map()
Special handling for ras_lipid_conduit_data_reader; may go away in the future.
void add_owner(int data_id, int owner)
Special handling for ras_lipid_conduit_data_reader; may go away in the future.
std::vector< conduit::Node > m_send_buffer
work space; used in exchange_data
std::vector< El::mpi::Request< El::byte > > m_recv_requests
bool m_world_master
convenience handles
map_pssi_t m_owner
Maps an index to the processor that owns the associated data First value of index is the sample ID an...
generic_data_reader * m_reader
std::vector< std::unordered_set< int > > m_indices_to_recv
std::string m_test_dir
The directory to use for testing checkpointing.
std::vector< size_t > m_incoming_msg_sizes
std::vector< int > m_recv_data_ids
Contains the list of data IDs that will be received.
std::unordered_map< int, conduit::Node > m_data
Contains the conduit nodes that are "owned" by this rank.
int m_np_in_trainer
number of procs in the trainer; convenience handle
std::string m_cur_spill_dir
Current directory for spilling (writing to file) conduit nodes.
std::unordered_map< int, conduit::Node > m_minibatch_data
std::unordered_map< std::pair< size_t, size_t >, int, size_t_pair_hash > map_pssi_t
map_is_t m_image_offsets
Maps a data_id to the image location in a shared memory segment.
std::size_t operator()(const std::pair< T1, T2 > &pair) const
void set_preloaded_owner_map(const std::unordered_map< int, int > &owner)
fills in m_owner, which maps index -> owning processor
data_store_conduit * copy() const
void PROFILE(T var1, Types... var2) const
map_ii_t m_spilled_nodes
maps data_id to m_m_cur_spill_dir_integer.
void DEBUG_DS(T var1, Types... var2)
int get_data_size()
for use during development and debugging
std::unordered_map< int, conduit::Node > m_data_cache
Contains a cache of the conduit nodes that are "owned" by this rank.
void set_is_local_cache(bool flag=true)
turns local cache mode on of off
void set_owner_map(const std::unordered_map< int, int > &m)
std::vector< conduit::Node > m_send_buffer_2