LBANN  0.103.0
LivermoreBigArtificialNeuralNetworkToolkit
data_reader.hpp
Go to the documentation of this file.
1 // Copyright (c) 2014-2023, Lawrence Livermore National Security, LLC.
3 // Produced at the Lawrence Livermore National Laboratory.
4 // Written by the LBANN Research Team (B. Van Essen, et al.) listed in
5 // the CONTRIBUTORS file. <lbann-dev@llnl.gov>
6 //
7 // LLNL-CODE-697807.
8 // All rights reserved.
9 //
10 // This file is part of LBANN: Livermore Big Artificial Neural Network
11 // Toolkit. For details, see http://software.llnl.gov/LBANN or
12 // https://github.com/LLNL/LBANN.
13 //
14 // Licensed under the Apache License, Version 2.0 (the "Licensee"); you
15 // may not use this file except in compliance with the License. You may
16 // obtain a copy of the License at:
17 //
18 // http://www.apache.org/licenses/LICENSE-2.0
19 //
20 // Unless required by applicable law or agreed to in writing, software
21 // distributed under the License is distributed on an "AS IS" BASIS,
22 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
23 // implied. See the License for the specific language governing
24 // permissions and limitations under the license.
25 //
26 // lbann_data_reader .hpp - Input data base class for training, testing
28 
29 #ifndef LBANN_DATA_READER_HPP
30 #define LBANN_DATA_READER_HPP
31 
32 #include "lbann/base.hpp"
35 #include "lbann/io/file_io.hpp"
38 #include "lbann/utils/options.hpp"
40 
41 #include <algorithm>
42 #include <cassert>
43 #include <string>
44 #include <unistd.h>
45 #include <unordered_set>
46 #include <vector>
47 
48 // Forward-declare Conduit nodes.
49 namespace conduit {
50 class Node;
51 }
52 
53 #define NOT_IMPLEMENTED(n) \
54  { \
55  std::stringstream s; \
56  s << "the method " << n << " has not been implemented"; \
57  throw lbann_exception(s.str()); \
58  }
59 
60 namespace lbann {
61 
62 // Forward declarations
63 class Layer;
64 class data_store_conduit;
65 class thread_pool;
66 class trainer;
67 class persist;
68 
76 {
77 public:
78  using unused_index_map_t = std::map<execution_mode, std::vector<int>>;
79 
83  generic_data_reader(bool shuffle = true)
84  : m_verbose(global_argument_parser().get<bool>(LBANN_OPTION_VERBOSE)),
85  m_data_store(nullptr),
86  m_comm(nullptr),
87  m_mini_batch_size(0),
88  m_current_pos(0),
89  m_stride_to_next_mini_batch(0),
90  m_base_offset(0),
91  m_sample_stride(1),
92  m_iteration_stride(1),
93  m_last_mini_batch_size(0),
94  m_stride_to_last_mini_batch(0),
95  m_reset_mini_batch_index(0),
96  m_loaded_mini_batch_idx(0),
97  m_current_mini_batch_idx(0),
98  m_num_iterations_per_epoch(0),
99  m_num_parallel_readers(0),
100  m_max_files_to_load(0),
101  m_file_dir(""),
102  m_data_sample_list(""),
103  m_data_fn(""),
104  m_label_fn(""),
105  m_shuffle(shuffle),
106  m_absolute_sample_count(0),
107  m_use_fraction(1.0),
108  m_gan_labelling(false), // default, not GAN
109  m_gan_label_value(
110  0), // If GAN, default for fake label, discriminator model
111  m_io_thread_pool(nullptr),
112  m_keep_sample_order(false),
113  m_issue_warning(true)
114  {
115  // By default only support fetching input samples
116  m_supported_input_types[INPUT_DATA_TYPE_SAMPLES] = true;
117  }
118  generic_data_reader(const generic_data_reader&) = default;
119  generic_data_reader& operator=(const generic_data_reader&) = default;
120 
121  virtual ~generic_data_reader();
122  virtual generic_data_reader* copy() const = 0;
123 
125  template <class Archive>
126  void serialize(Archive& ar);
127 
129  void set_comm(lbann_comm* comm) { m_comm = comm; }
130 
132  lbann_comm* get_comm() const { return m_comm; }
133 
134  virtual bool has_conduit_output() { return false; }
135 
136  // These non-virtual methods are used to specify where data is, how much to
137  // load, etc.
138 
142  void set_file_dir(std::string s);
143 
147  void set_local_file_dir(std::string s);
148 
153  void set_max_files_to_load(size_t n) { m_max_files_to_load = n; }
154 
159  std::string get_file_dir() const;
160 
165  std::string get_local_file_dir() const;
166 
172  void set_data_sample_list(std::string s);
173 
177  std::string get_data_sample_list() const;
178 
183  void keep_sample_order(bool same_order = false);
184 
191  void set_data_filename(std::string s);
192 
197  std::string get_data_filename() const;
198 
204  void set_label_filename(std::string s);
205 
211  std::string get_label_filename() const;
212 
217  void set_shuffle(bool b) { m_shuffle = b; }
218 
222  bool is_shuffled() const { return m_shuffle; }
223 
228  void set_shuffled_indices(const std::vector<int>& indices)
229  {
230  m_shuffled_indices = indices;
231  }
232 
236  const std::vector<int>& get_shuffled_indices() const
237  {
238  return m_shuffled_indices;
239  }
240 
248  void set_first_n(int n);
249 
254  void set_absolute_sample_count(size_t s);
255 
261  void set_use_fraction(double s);
262 
268  virtual void set_execution_mode_split_fraction(execution_mode m, double s);
269 
274  virtual void set_role(std::string role);
275 
279  std::string get_role() const { return m_role; }
280 
287  virtual void load() = 0;
288 
295  virtual void setup(int num_io_threads,
296  observer_ptr<thread_pool> io_thread_pool);
297 
299  virtual std::string get_type() const = 0;
300 
303  int fetch(std::map<data_field_type, CPUMat*>& input_buffers,
304  El::Matrix<El::Int>& indices_fetched,
305  size_t mb_size);
306 
307  int fetch(std::vector<conduit::Node>& samples,
308  El::Matrix<El::Int>& indices_fetched,
309  size_t mb_size);
310 
313  virtual bool has_data_field(data_field_type data_field) const
314  {
315  if (m_supported_input_types.find(data_field) !=
316  m_supported_input_types.end()) {
317  return m_supported_input_types.at(data_field);
318  }
319  else {
320  return false;
321  }
322  }
323 
324  virtual bool has_labels() const
325  {
326  return has_data_field(INPUT_DATA_TYPE_LABELS);
327  }
328  virtual bool has_responses() const
329  {
330  return has_data_field(INPUT_DATA_TYPE_RESPONSES);
331  }
332 
334  void set_has_data_field(data_field_type const data_field, const bool b)
335  {
336  m_supported_input_types[data_field] = b;
337  }
338 
340  virtual void set_has_labels(const bool b)
341  {
342  m_supported_input_types[INPUT_DATA_TYPE_LABELS] = b;
343  }
345  virtual void set_has_responses(const bool b)
346  {
347  m_supported_input_types[INPUT_DATA_TYPE_RESPONSES] = b;
348  }
349 
350  void start_data_store_mini_batch_exchange();
351  void finish_data_store_mini_batch_exchange();
352 
358  virtual bool update(bool is_active_reader);
359 
369  virtual int get_num_labels() const { return 0; }
372  virtual int get_num_responses() const { return 1; }
374  virtual int get_linearized_data_size() const { return 0; }
376  virtual int get_linearized_label_size() const { return 0; }
378  virtual int get_linearized_response_size() const { return 1; }
380  virtual int get_linearized_size(data_field_type const& data_field) const;
381 
383  virtual const std::vector<El::Int> get_data_dims() const
384  {
385  return std::vector<El::Int>(0);
386  }
387 
388  virtual std::vector<El::Int>
389  get_slice_points(const slice_points_mode var_category, bool& is_supported)
390  {
391  is_supported = false;
392  return {};
393  }
394 
396  virtual bool position_valid() const
397  {
398  return (m_current_pos < get_num_data());
399  }
403  virtual bool position_is_overrun() const
404  {
405  int end_pos = (int)m_shuffled_indices.size();
406  return (m_current_pos >= end_pos &&
407  (m_current_pos - end_pos) < m_comm->get_procs_per_trainer());
408  }
410  bool at_new_epoch() const
411  {
414  return ((m_loaded_mini_batch_idx == m_reset_mini_batch_index) &&
415  (m_current_mini_batch_idx == 0));
416  }
418  void set_mini_batch_size(const int s);
420  int get_mini_batch_size() const { return m_mini_batch_size; }
422  int get_loaded_mini_batch_size() const;
424  int get_current_mini_batch_size() const;
426  int get_mini_batch_max() const { return m_mini_batch_size; }
429  {
430  m_stride_to_next_mini_batch = s;
431  }
434  {
435  return m_stride_to_next_mini_batch;
436  }
438  void set_sample_stride(const int s) { m_sample_stride = s; }
440  int get_sample_stride() const { return m_sample_stride; }
442  void set_iteration_stride(const int s) { m_iteration_stride = s; }
444  int get_iteration_stride() const { return m_iteration_stride; }
446  virtual void set_base_offset(const int s) { m_base_offset = s; }
448  int get_base_offset() const { return m_base_offset; }
450  void set_last_mini_batch_size(const int s) { m_last_mini_batch_size = s; }
452  int get_last_mini_batch_size() const { return m_last_mini_batch_size; }
455  {
456  m_stride_to_last_mini_batch = s;
457  }
460  {
461  return m_stride_to_last_mini_batch;
462  }
464  void set_num_parallel_readers(const int s) { m_num_parallel_readers = s; }
466  int get_num_parallel_readers() const { return m_num_parallel_readers; }
468  virtual void set_reset_mini_batch_index(const int s)
469  {
470  m_reset_mini_batch_index = s;
471  }
473  int get_reset_mini_batch_index() const { return m_reset_mini_batch_index; }
475  int get_loaded_mini_batch_index() const { return m_loaded_mini_batch_idx; }
477  int get_current_mini_batch_index() const { return m_current_mini_batch_idx; }
480  {
481  m_current_pos = m_base_offset;
482  m_loaded_mini_batch_idx = m_reset_mini_batch_index;
483  m_current_mini_batch_idx = 0;
484  }
486  int get_position() const { return m_current_pos; }
488  int get_next_position() const;
490  int* get_indices() { return &m_shuffled_indices[0]; }
492  virtual int get_num_data() const { return (int)m_shuffled_indices.size(); }
494  int get_num_unused_data(execution_mode m) const;
495 
497  int* get_unused_data(execution_mode m);
498 
499  const std::vector<int>& get_unused_indices(execution_mode m);
500 
502  void set_num_iterations_per_epoch(int num_iterations_per_epoch)
503  {
504  m_num_iterations_per_epoch =
505  num_iterations_per_epoch;
506  }
510  {
511  return m_num_iterations_per_epoch;
512  }
514 
517  int get_current_step_in_epoch() const { return m_current_mini_batch_idx; }
518 
524  void resize_shuffled_indices();
525 
532  void select_subset_of_data();
533 
538  virtual void use_unused_index_set(execution_mode m);
539 
541  virtual bool has_list_per_model() const { return false; }
543  virtual bool has_list_per_trainer() const { return false; }
544 
547  bool save_to_checkpoint_shared(persist& p, execution_mode mode);
548 
551  bool load_from_checkpoint_shared(persist& p, execution_mode mode);
552 
553  bool save_to_checkpoint_distributed(persist& p, execution_mode mode);
554 
557  bool load_from_checkpoint_distributed(persist& p, execution_mode mode);
558 
560  const data_store_conduit& get_data_store() const;
561 
563  data_store_conduit& get_data_store();
564 
565  data_store_conduit* get_data_store_ptr() const { return m_data_store; }
566 
571  void setup_data_store(int mini_batch_size);
572 
573  void instantiate_data_store();
574 
575  virtual void preload_data_store();
576 
577  void set_gan_labelling(bool has_gan_labelling)
578  {
579  m_gan_labelling = has_gan_labelling;
580  }
581  void set_gan_label_value(int gan_label_value)
582  {
583  m_gan_label_value = gan_label_value;
584  }
585 
587  void set_data_store(data_store_conduit* g);
588 
589  virtual bool data_store_active() const;
590 
591  virtual bool priming_data_store() const;
592 
595  virtual void post_update() {}
596 
599  {
600  m_transform_pipeline = std::move(tp);
601  }
602 
603 #ifdef LBANN_HAS_DISTCONV
604 
608  virtual bool is_tensor_shuffle_required() const { return true; }
609 #endif // LBANN_HAS_DISTCONV
610 
611 protected:
612  bool m_verbose = false;
613 
614  // For use with conduit when samples are corrupt.
615  mutable std::unordered_set<int> m_using_random_node;
616 
621  size_t get_absolute_sample_count() const;
622 
628  double get_use_fraction() const;
629 
634  double get_execution_mode_split_fraction(execution_mode m) const;
635 
637 
639 
640  virtual bool
641  fetch_data_block(std::map<data_field_type, CPUMat*>& input_buffers,
642  El::Int block_offset,
643  El::Int block_stride,
644  El::Int mb_size,
645  El::Matrix<El::Int>& indices_fetched);
646 
647  bool fetch_data_block_conduit(std::vector<conduit::Node>& samples,
648  El::Int block_offset,
649  El::Int block_stride,
650  El::Int mb_size,
651  El::Matrix<El::Int>& indices_fetched);
652 
665  virtual bool fetch_data_field(data_field_type data_field,
666  CPUMat& Y,
667  int data_id,
668  int mb_idx)
669  {
670  NOT_IMPLEMENTED("fetch_data_field");
671  return false;
672  }
673 
674  virtual bool fetch_conduit_node(conduit::Node& sample, int data_id)
675  {
676  NOT_IMPLEMENTED("fetch_conduit_node");
677  return false;
678  }
679 
686  virtual bool fetch_datum(CPUMat& X, int data_id, int mb_idx)
687  {
688  NOT_IMPLEMENTED("fetch_dataum");
689  return false;
690  }
691 
698  virtual bool fetch_label(CPUMat& Y, int data_id, int mb_idx)
699  {
700  NOT_IMPLEMENTED("fetch_label");
701  return false;
702  }
703 
710  virtual bool fetch_response(CPUMat& Y, int data_id, int mb_idx)
711  {
712  NOT_IMPLEMENTED("fetch_response");
713  return false;
714  }
715 
722  inline CPUMat create_datum_view(CPUMat& X, const int mb_idx)
723  {
724  return El::View(X, El::IR(0, X.Height()), El::IR(mb_idx, mb_idx + 1));
725  }
726 
730  virtual void preprocess_data_source(int tid){};
734  virtual void postprocess_data_source(int tid){};
735 
737  virtual void shuffle_indices();
739  virtual void shuffle_indices(rng_gen& gen);
740 
741 public:
755 
756  std::vector<int> m_shuffled_indices;
759 
769  int
771 
773 
775  std::string m_file_dir;
776  std::string m_local_file_dir;
777  std::string m_data_sample_list;
778  std::string m_data_fn;
779  std::string m_label_fn;
780  bool m_shuffle;
782  std::map<execution_mode, double> m_execution_mode_split_fraction;
785  std::string m_role;
786 
793  void print_get_methods(const std::string filename);
794 
799  size_t get_num_indices_to_use() const;
800 
803 
804  void set_use_data_store(bool s) { m_use_data_store = s; }
805 
806 private:
807  virtual void do_preload_data_store(); // Throws: "Not implemented."
808 
809 protected:
810  bool m_use_data_store = false;
811 
814  std::map<data_field_type, bool> m_supported_input_types;
815 
816  // var to support GAN
817  bool m_gan_labelling; // boolean flag of whether its GAN binary label, default
818  // is false
819  int m_gan_label_value; // zero(0) or 1 label value for discriminator, default
820  // is 0
821 
823 
827 
830 
835 
838  void error_check_counts() const;
839 };
840 
841 template <typename T>
842 inline void set_minibatch_item(Mat& M,
843  const int mb_idx,
844  const T* const ptr,
845  const size_t count)
846 {
847  if ((count > 0u) && (ptr == nullptr)) {
848  throw lbann_exception(std::string{} + __FILE__ + " " +
849  std::to_string(__LINE__) +
850  " :: attempt to dereference a nullptr ");
851  }
852  for (size_t i = 0u; i < count; ++i) {
853  M.Set(static_cast<El::Int>(i),
854  static_cast<El::Int>(mb_idx),
855  static_cast<DataType>(ptr[i]));
856  }
857 }
858 
859 } // namespace lbann
860 
861 #endif // LBANN_DATA_READER_HPP
virtual bool has_conduit_output()
virtual bool has_data_field(data_field_type data_field) const
Check to see if the data reader supports this specific data field.
int get_last_mini_batch_size() const
Return the last mini batch size.
void set_sample_stride(const int s)
Set the sample stride.
void set_stride_to_last_mini_batch(const int s)
Set the last mini batch stride.
virtual int get_linearized_response_size() const
Get the linearized size (i.e. number of elements) in a response.
int * get_indices()
Get a pointer to the start of the shuffled indices.
virtual bool fetch_datum(CPUMat &X, int data_id, int mb_idx)
bool at_new_epoch() const
True if the data reader is at the start of an epoch.
void set_stride_to_next_mini_batch(const int s)
Set the mini batch stride.
std::map< data_field_type, bool > m_supported_input_types
Holds a true value for each input data type that is supported. Use an ordered map so that checkpoints...
virtual bool fetch_response(CPUMat &Y, int data_id, int mb_idx)
void set_num_iterations_per_epoch(int num_iterations_per_epoch)
Set the number of iterations in each epoch.
int get_sample_stride() const
Return the sample stride.
#define INPUT_DATA_TYPE_LABELS
slice_points_mode
Definition: metadata.hpp:66
#define LBANN_OPTION_VERBOSE
Definition: options.hpp:62
void set_initial_position()
Set the current position based on the base and model offsets.
std::map< execution_mode, double > m_execution_mode_split_fraction
CPUMat create_datum_view(CPUMat &X, const int mb_idx)
void serialize(std::ostream &os, google::protobuf::Message const &msg)
Serialize the protobuf message to a stream.
int get_iteration_stride() const
Return the iteration stride.
virtual bool position_valid() const
True if the data reader&#39;s current position is valid.
int get_stride_to_next_mini_batch() const
Return the mini batch stride.
virtual bool fetch_label(CPUMat &Y, int data_id, int mb_idx)
void load(std::string const &pbuf_filename, google::protobuf::Message &msg)
Fill the protobuf message from a binary file.
data_store_conduit * m_data_store
std::vector< int > m_shuffled_indices
int get_stride_to_last_mini_batch() const
Return the last mini batch stride.
int get_num_parallel_readers() const
Return the number of parallel readers per model.
virtual bool has_responses() const
int get_mini_batch_size() const
Get the mini batch size.
virtual void set_reset_mini_batch_index(const int s)
Set the starting mini-batch index for the epoch.
virtual void set_has_labels(const bool b)
Whether or not a data reader has labels.
std::string to_string(El::Device const &d)
std::map< execution_mode, std::vector< int > > unused_index_map_t
Definition: data_reader.hpp:78
virtual int get_num_data() const
Get the number of samples in this dataset.
#define INPUT_DATA_TYPE_SAMPLES
void set_gan_label_value(int gan_label_value)
std::mt19937 rng_gen
std::unordered_set< int > m_using_random_node
void set_iteration_stride(const int s)
Set the iteration stride.
int m_loaded_mini_batch_idx
The index of the current mini-batch that has been loaded.
#define NOT_IMPLEMENTED(n)
Definition: data_reader.hpp:53
unused_index_map_t m_unused_indices
Record of the indicies that are not being used for training.
virtual bool fetch_data_field(data_field_type data_field, CPUMat &Y, int data_id, int mb_idx)
Called by fetch_data, fetch_label, fetch_response.
virtual std::vector< El::Int > get_slice_points(const slice_points_mode var_category, bool &is_supported)
int m_iteration_stride
Stride used by parallel data readers within the model.
El::Matrix< DataType, El::Device::CPU > CPUMat
Definition: base.hpp:116
const std::vector< int > & get_shuffled_indices() const
int get_mini_batch_max() const
Return the full mini_batch_size.
typename std::add_pointer< T >::type observer_ptr
Creating an observer_ptr to complement the unique_ptr and shared_ptr.
Definition: base.hpp:54
void set_comm(lbann_comm *comm)
set the comm object
execution_mode
Neural network execution mode.
Definition: base.hpp:229
virtual bool has_list_per_model() const
Does the data reader have a unique sample list per model.
exception lbann_exception
Definition: exception.hpp:145
virtual bool fetch_conduit_node(conduit::Node &sample, int data_id)
std::string get_role() const
std::string get()
#define INPUT_DATA_TYPE_RESPONSES
int m_num_parallel_readers
How many iterations all readers will execute.
transform::transform_pipeline m_transform_pipeline
void set_last_mini_batch_size(const int s)
Set the last mini batch size.
virtual void preprocess_data_source(int tid)
void set_has_data_field(data_field_type const data_field, const bool b)
Whether or not a data reader has a data field.
lbann_comm * get_comm() const
returns a (possibly nullptr) to comm
virtual void set_base_offset(const int s)
Return the base offset.
void set_gan_labelling(bool has_gan_labelling)
generic_data_reader(bool shuffle=true)
Definition: data_reader.hpp:83
El::Matrix< DataType, El::Device::CPU > Mat
Definition: base.hpp:185
int get_loaded_mini_batch_index() const
Return the current mini-batch index for the epoch.
virtual void set_has_responses(const bool b)
Whether or not a data reader has a response field.
size_t m_max_files_to_load
How many parallel readers are being used.
observer_ptr< thread_pool > m_io_thread_pool
virtual bool position_is_overrun() const
std::string data_field_type
void set_max_files_to_load(size_t n)
void set_shuffled_indices(const std::vector< int > &indices)
void set_num_parallel_readers(const int s)
Set the number of parallel readers per model.
virtual int get_linearized_data_size() const
Get the linearized size (i.e. number of elements) in a sample.
default_arg_parser_type & global_argument_parser()
int get_current_mini_batch_index() const
Return the current mini-batch index for the epoch.
virtual int get_linearized_label_size() const
Get the linearized size (i.e. number of elements) in a label.
virtual const std::vector< El::Int > get_data_dims() const
Get the dimensions of the data.
virtual int get_num_responses() const
Return the number of responses in this dataset.
int get_num_iterations_per_epoch() const
Get the number of iterations in each epoch.
virtual void postprocess_data_source(int tid)
virtual bool has_labels() const
void set_minibatch_item(Mat &M, const int mb_idx, const T *const ptr, const size_t count)
virtual bool has_list_per_trainer() const
Does the data reader have a unique sample list per trainer.
data_store_conduit * get_data_store_ptr() const
int get_current_step_in_epoch() const
int get_reset_mini_batch_index() const
Return the starting mini-batch index for the epoch.
int get_position() const
Get the current position in the data reader.
void set_transform_pipeline(transform::transform_pipeline &&tp)
int m_reset_mini_batch_index
The index at which this data reader starts its epoch.
int get_base_offset() const
Return the base offset.