29 #ifndef LBANN_DATA_READER_HPP 30 #define LBANN_DATA_READER_HPP 45 #include <unordered_set> 53 #define NOT_IMPLEMENTED(n) \ 55 std::stringstream s; \ 56 s << "the method " << n << " has not been implemented"; \ 57 throw lbann_exception(s.str()); \ 64 class data_store_conduit;
85 m_data_store(nullptr),
89 m_stride_to_next_mini_batch(0),
92 m_iteration_stride(1),
93 m_last_mini_batch_size(0),
94 m_stride_to_last_mini_batch(0),
95 m_reset_mini_batch_index(0),
96 m_loaded_mini_batch_idx(0),
97 m_current_mini_batch_idx(0),
98 m_num_iterations_per_epoch(0),
99 m_num_parallel_readers(0),
100 m_max_files_to_load(0),
102 m_data_sample_list(
""),
106 m_absolute_sample_count(0),
108 m_gan_labelling(false),
111 m_io_thread_pool(nullptr),
112 m_keep_sample_order(false),
113 m_issue_warning(true)
125 template <
class Archive>
142 void set_file_dir(std::string s);
147 void set_local_file_dir(std::string s);
159 std::string get_file_dir()
const;
165 std::string get_local_file_dir()
const;
172 void set_data_sample_list(std::string s);
177 std::string get_data_sample_list()
const;
183 void keep_sample_order(
bool same_order =
false);
191 void set_data_filename(std::string s);
197 std::string get_data_filename()
const;
204 void set_label_filename(std::string s);
211 std::string get_label_filename()
const;
230 m_shuffled_indices = indices;
238 return m_shuffled_indices;
248 void set_first_n(
int n);
254 void set_absolute_sample_count(
size_t s);
261 void set_use_fraction(
double s);
268 virtual void set_execution_mode_split_fraction(
execution_mode m,
double s);
274 virtual void set_role(std::string role);
287 virtual void load() = 0;
295 virtual void setup(
int num_io_threads,
299 virtual std::string get_type()
const = 0;
303 int fetch(std::map<data_field_type, CPUMat*>& input_buffers,
304 El::Matrix<El::Int>& indices_fetched,
307 int fetch(std::vector<conduit::Node>& samples,
308 El::Matrix<El::Int>& indices_fetched,
315 if (m_supported_input_types.find(data_field) !=
316 m_supported_input_types.end()) {
317 return m_supported_input_types.at(data_field);
336 m_supported_input_types[data_field] = b;
350 void start_data_store_mini_batch_exchange();
351 void finish_data_store_mini_batch_exchange();
358 virtual bool update(
bool is_active_reader);
369 virtual int get_num_labels()
const {
return 0; }
380 virtual int get_linearized_size(
data_field_type const& data_field)
const;
385 return std::vector<El::Int>(0);
388 virtual std::vector<El::Int>
391 is_supported =
false;
398 return (m_current_pos < get_num_data());
405 int end_pos = (int)m_shuffled_indices.size();
406 return (m_current_pos >= end_pos &&
407 (m_current_pos - end_pos) < m_comm->get_procs_per_trainer());
414 return ((m_loaded_mini_batch_idx == m_reset_mini_batch_index) &&
415 (m_current_mini_batch_idx == 0));
418 void set_mini_batch_size(
const int s);
422 int get_loaded_mini_batch_size()
const;
424 int get_current_mini_batch_size()
const;
430 m_stride_to_next_mini_batch = s;
435 return m_stride_to_next_mini_batch;
456 m_stride_to_last_mini_batch = s;
461 return m_stride_to_last_mini_batch;
470 m_reset_mini_batch_index = s;
481 m_current_pos = m_base_offset;
482 m_loaded_mini_batch_idx = m_reset_mini_batch_index;
483 m_current_mini_batch_idx = 0;
488 int get_next_position()
const;
492 virtual int get_num_data()
const {
return (
int)m_shuffled_indices.size(); }
504 m_num_iterations_per_epoch =
505 num_iterations_per_epoch;
511 return m_num_iterations_per_epoch;
524 void resize_shuffled_indices();
532 void select_subset_of_data();
571 void setup_data_store(
int mini_batch_size);
573 void instantiate_data_store();
575 virtual void preload_data_store();
579 m_gan_labelling = has_gan_labelling;
583 m_gan_label_value = gan_label_value;
589 virtual bool data_store_active()
const;
591 virtual bool priming_data_store()
const;
600 m_transform_pipeline = std::move(tp);
603 #ifdef LBANN_HAS_DISTCONV 608 virtual bool is_tensor_shuffle_required()
const {
return true; }
609 #endif // LBANN_HAS_DISTCONV 612 bool m_verbose =
false;
621 size_t get_absolute_sample_count()
const;
628 double get_use_fraction()
const;
641 fetch_data_block(std::map<data_field_type, CPUMat*>& input_buffers,
642 El::Int block_offset,
643 El::Int block_stride,
645 El::Matrix<El::Int>& indices_fetched);
647 bool fetch_data_block_conduit(std::vector<conduit::Node>& samples,
648 El::Int block_offset,
649 El::Int block_stride,
651 El::Matrix<El::Int>& indices_fetched);
724 return El::View(X, El::IR(0, X.Height()), El::IR(mb_idx, mb_idx + 1));
737 virtual void shuffle_indices();
739 virtual void shuffle_indices(
rng_gen& gen);
793 void print_get_methods(
const std::string filename);
799 size_t get_num_indices_to_use()
const;
807 virtual void do_preload_data_store();
810 bool m_use_data_store =
false;
838 void error_check_counts()
const;
841 template <
typename T>
847 if ((count > 0u) && (ptr ==
nullptr)) {
850 " :: attempt to dereference a nullptr ");
852 for (
size_t i = 0u; i < count; ++i) {
853 M.Set(static_cast<El::Int>(i),
854 static_cast<El::Int>(mb_idx),
855 static_cast<DataType>(ptr[i]));
861 #endif // LBANN_DATA_READER_HPP virtual bool has_conduit_output()
virtual bool has_data_field(data_field_type data_field) const
Check to see if the data reader supports this specific data field.
virtual void post_update()
int get_last_mini_batch_size() const
Return the last mini batch size.
void set_sample_stride(const int s)
Set the sample stride.
void set_stride_to_last_mini_batch(const int s)
Set the last mini batch stride.
std::string m_local_file_dir
virtual int get_linearized_response_size() const
Get the linearized size (i.e. number of elements) in a response.
int * get_indices()
Get a pointer to the start of the shuffled indices.
virtual bool fetch_datum(CPUMat &X, int data_id, int mb_idx)
bool at_new_epoch() const
True if the data reader is at the start of an epoch.
void set_stride_to_next_mini_batch(const int s)
Set the mini batch stride.
int m_stride_to_last_mini_batch
std::map< data_field_type, bool > m_supported_input_types
Holds a true value for each input data type that is supported. Use an ordered map so that checkpoints...
virtual bool fetch_response(CPUMat &Y, int data_id, int mb_idx)
void set_num_iterations_per_epoch(int num_iterations_per_epoch)
Set the number of iterations in each epoch.
int get_sample_stride() const
Return the sample stride.
#define LBANN_OPTION_VERBOSE
void set_initial_position()
Set the current position based on the base and model offsets.
std::map< execution_mode, double > m_execution_mode_split_fraction
CPUMat create_datum_view(CPUMat &X, const int mb_idx)
void serialize(std::ostream &os, google::protobuf::Message const &msg)
Serialize the protobuf message to a stream.
int get_iteration_stride() const
Return the iteration stride.
virtual bool position_valid() const
True if the data reader's current position is valid.
int get_stride_to_next_mini_batch() const
Return the mini batch stride.
virtual bool fetch_label(CPUMat &Y, int data_id, int mb_idx)
void load(std::string const &pbuf_filename, google::protobuf::Message &msg)
Fill the protobuf message from a binary file.
data_store_conduit * m_data_store
std::vector< int > m_shuffled_indices
int m_stride_to_next_mini_batch
void set_use_data_store(bool s)
int get_stride_to_last_mini_batch() const
Return the last mini batch stride.
size_t m_absolute_sample_count
int get_num_parallel_readers() const
Return the number of parallel readers per model.
virtual bool has_responses() const
int get_mini_batch_size() const
Get the mini batch size.
virtual void set_reset_mini_batch_index(const int s)
Set the starting mini-batch index for the epoch.
virtual void set_has_labels(const bool b)
Whether or not a data reader has labels.
std::string to_string(El::Device const &d)
std::map< execution_mode, std::vector< int > > unused_index_map_t
virtual int get_num_data() const
Get the number of samples in this dataset.
void set_gan_label_value(int gan_label_value)
std::unordered_set< int > m_using_random_node
void set_iteration_stride(const int s)
Set the iteration stride.
int m_loaded_mini_batch_idx
The index of the current mini-batch that has been loaded.
#define NOT_IMPLEMENTED(n)
unused_index_map_t m_unused_indices
Record of the indicies that are not being used for training.
virtual bool fetch_data_field(data_field_type data_field, CPUMat &Y, int data_id, int mb_idx)
Called by fetch_data, fetch_label, fetch_response.
virtual std::vector< El::Int > get_slice_points(const slice_points_mode var_category, bool &is_supported)
int m_iteration_stride
Stride used by parallel data readers within the model.
El::Matrix< DataType, El::Device::CPU > CPUMat
const std::vector< int > & get_shuffled_indices() const
int get_mini_batch_max() const
Return the full mini_batch_size.
int m_current_mini_batch_idx
typename std::add_pointer< T >::type observer_ptr
Creating an observer_ptr to complement the unique_ptr and shared_ptr.
void set_comm(lbann_comm *comm)
set the comm object
execution_mode
Neural network execution mode.
virtual bool has_list_per_model() const
Does the data reader have a unique sample list per model.
exception lbann_exception
virtual bool fetch_conduit_node(conduit::Node &sample, int data_id)
std::string get_role() const
int m_num_parallel_readers
How many iterations all readers will execute.
transform::transform_pipeline m_transform_pipeline
void set_last_mini_batch_size(const int s)
Set the last mini batch size.
virtual void preprocess_data_source(int tid)
void set_has_data_field(data_field_type const data_field, const bool b)
Whether or not a data reader has a data field.
lbann_comm * get_comm() const
returns a (possibly nullptr) to comm
virtual void set_base_offset(const int s)
Return the base offset.
void set_gan_labelling(bool has_gan_labelling)
generic_data_reader(bool shuffle=true)
El::Matrix< DataType, El::Device::CPU > Mat
std::string m_data_sample_list
int get_loaded_mini_batch_index() const
Return the current mini-batch index for the epoch.
virtual void set_has_responses(const bool b)
Whether or not a data reader has a response field.
size_t m_max_files_to_load
How many parallel readers are being used.
observer_ptr< thread_pool > m_io_thread_pool
virtual bool position_is_overrun() const
std::string data_field_type
void set_max_files_to_load(size_t n)
void set_shuffled_indices(const std::vector< int > &indices)
void set_num_parallel_readers(const int s)
Set the number of parallel readers per model.
virtual int get_linearized_data_size() const
Get the linearized size (i.e. number of elements) in a sample.
default_arg_parser_type & global_argument_parser()
int get_current_mini_batch_index() const
Return the current mini-batch index for the epoch.
virtual int get_linearized_label_size() const
Get the linearized size (i.e. number of elements) in a label.
virtual const std::vector< El::Int > get_data_dims() const
Get the dimensions of the data.
virtual int get_num_responses() const
Return the number of responses in this dataset.
int get_num_iterations_per_epoch() const
Get the number of iterations in each epoch.
int m_last_mini_batch_size
virtual void postprocess_data_source(int tid)
virtual bool has_labels() const
void set_minibatch_item(Mat &M, const int mb_idx, const T *const ptr, const size_t count)
int m_num_iterations_per_epoch
virtual bool has_list_per_trainer() const
Does the data reader have a unique sample list per trainer.
data_store_conduit * get_data_store_ptr() const
int get_current_step_in_epoch() const
int get_reset_mini_batch_index() const
Return the starting mini-batch index for the epoch.
int get_position() const
Get the current position in the data reader.
void set_transform_pipeline(transform::transform_pipeline &&tp)
int m_reset_mini_batch_index
The index at which this data reader starts its epoch.
int get_base_offset() const
Return the base offset.