casacore
Loading...
Searching...
No Matches
SiscoWriter.h
Go to the documentation of this file.
1#ifndef SISCO_SISCO_WRITER_H_
2#define SISCO_SISCO_WRITER_H_
3
4#include <complex>
5#include <fstream>
6#include <list>
7#include <map>
8#include <set>
9#include <span>
10#include <string>
11#include <vector>
12
13#include "ConditionalQueue.h"
14#include "Deflate.h"
15#include "Lane.h"
16#include "Sisco.h"
17
18namespace casacore::sisco {
19
25 public:
26 SiscoWriter(const std::string& filename, int predict_level,
27 int deflate_level);
30 if (file_.is_open()) Close();
31 }
33
34 void Open(std::span<const std::byte> header_data);
35 void Write(size_t baseline_index, std::span<const std::complex<float>> data);
36 void Close();
37
38 private:
39 struct Chunk {
40 size_t index;
41 std::vector<std::byte> mantisas_data;
42 std::vector<std::byte> exponent_data;
44 // Whenever a thread finishes preprocessing, it will update this value. If
45 // the value is high enough, the thread knows the chunk is ready for
46 // deflate.
48 size_t n_finished_items = 0;
49 size_t item_counter = 0;
50 size_t chunk_size = 0;
51 };
53 std::vector<float> real_data;
54 std::vector<float> imaginary_data;
56 std::byte* mantisa_position;
59 };
60 struct WriteTask {
61 size_t index;
63 std::vector<std::byte> data;
64 };
70 struct ThreadData {};
71
72 void NewChunk(std::unique_lock<std::mutex>& lock);
73 void RemoveChunk(const Chunk* chunk);
75 void WriteLoop();
77 std::unique_lock<std::mutex>& lock);
78 void WriteChunk(size_t uncompressed_size, std::span<const std::byte> data);
79 void DeflateChunk(Chunk& chunk);
80
81 // Chunk data
82 constexpr static size_t kDefaultChunkSize = 1024 * 1024;
86 size_t n_chunks_ = 0;
87 // A list is used because we do not want to invalidate pointers when
88 // adding/revoming members. Access requires holding the mutex.
89 std::list<Chunk> chunks_;
91
92 // Indexed by baseline_index.
93 std::map<size_t, BaselineData> baseline_data_;
94 std::ofstream file_;
95 std::string filename_;
100
101 std::mutex mutex_;
102 std::condition_variable chunk_finished_condition_;
105 std::set<size_t> busy_baselines_;
106 std::vector<std::thread> compression_threads_;
107 std::thread write_thread_;
108};
109
110} // namespace casacore::sisco
111
112#endif
Internal header file for the Lane.
The Lane is an efficient cyclic buffer that is synchronized.
Definition Lane.h:100
A queue with a limited size and the ability to query only specific values.
void RemoveChunk(const Chunk *chunk)
aocommon::Lane< WriteTask > write_tasks_
Definition SiscoWriter.h:99
std::set< size_t > busy_baselines_
Holds a list of baseline ids currently being preprocessed.
static constexpr size_t kDefaultChunkSize
Chunk data.
Definition SiscoWriter.h:82
void Preprocess(SiscoWriter::PreprocessingTask &task, std::unique_lock< std::mutex > &lock)
std::condition_variable chunk_finished_condition_
SiscoWriter(SiscoWriter &&)=delete
ConditionalQueue< PreprocessingTask > preprocessing_tasks_
Definition SiscoWriter.h:98
std::list< Chunk > chunks_
A list is used because we do not want to invalidate pointers when adding/revoming members.
Definition SiscoWriter.h:89
std::vector< std::thread > compression_threads_
void Write(size_t baseline_index, std::span< const std::complex< float > > data)
SiscoWriter & operator=(SiscoWriter &&)=delete
SiscoWriter(const std::string &filename, int predict_level, int deflate_level)
std::map< size_t, BaselineData > baseline_data_
Indexed by baseline_index.
Definition SiscoWriter.h:93
void NewChunk(std::unique_lock< std::mutex > &lock)
void DeflateChunk(Chunk &chunk)
void WriteChunk(size_t uncompressed_size, std::span< const std::byte > data)
void Open(std::span< const std::byte > header_data)
std::vector< std::byte > mantisas_data
Definition SiscoWriter.h:41
size_t n_allocated_items
Whenever a thread finishes preprocessing, it will update this value.
Definition SiscoWriter.h:47
std::vector< std::byte > exponent_data
Definition SiscoWriter.h:42