6#include "azure/storage/blobs/blob_options.hpp"
8#include <azure/core/io/body_stream.hpp>
14namespace Azure {
namespace Storage {
namespace Blobs {
namespace _detail {
15 enum class AvroDatumType
33 class AvroStreamReader final {
36 struct ReaderPos final
38 const std::vector<uint8_t>* BufferPtr =
nullptr;
41 explicit AvroStreamReader(Core::IO::BodyStream& stream)
42 : m_stream(&stream), m_pos{&m_streambuffer, 0}
45 AvroStreamReader(
const AvroStreamReader&) =
delete;
46 AvroStreamReader& operator=(
const AvroStreamReader&) =
delete;
48 int64_t ParseInt(
const Core::Context& context);
49 void Advance(
size_t n,
const Core::Context& context);
52 size_t Preload(
size_t n,
const Core::Context& context);
53 size_t TryPreload(
size_t n,
const Core::Context& context);
58 size_t AvailableBytes()
const {
return m_streambuffer.size() - m_pos.Offset; }
61 Core::IO::BodyStream* m_stream;
62 std::vector<uint8_t> m_streambuffer;
65 friend class AvroDatum;
68 class AvroSchema final {
70 static const AvroSchema StringSchema;
71 static const AvroSchema BytesSchema;
72 static const AvroSchema IntSchema;
73 static const AvroSchema LongSchema;
74 static const AvroSchema FloatSchema;
75 static const AvroSchema DoubleSchema;
76 static const AvroSchema BoolSchema;
77 static const AvroSchema NullSchema;
78 static AvroSchema RecordSchema(
80 const std::vector<std::pair<std::string, AvroSchema>>& fieldsSchema);
81 static AvroSchema ArraySchema(AvroSchema elementSchema);
82 static AvroSchema MapSchema(AvroSchema elementSchema);
83 static AvroSchema UnionSchema(std::vector<AvroSchema> schemas);
84 static AvroSchema FixedSchema(std::string name, int64_t size);
86 const std::string& Name()
const {
return m_name; }
87 AvroDatumType Type()
const {
return m_type; }
88 const std::vector<std::string>& FieldNames()
const {
return m_status->m_keys; }
89 AvroSchema ItemSchema()
const {
return m_status->m_schemas[0]; }
90 const std::vector<AvroSchema>& FieldSchemas()
const {
return m_status->m_schemas; }
91 size_t Size()
const {
return static_cast<size_t>(m_status->m_size); }
94 explicit AvroSchema(AvroDatumType type) : m_type(type) {}
102 std::vector<std::string> m_keys;
103 std::vector<AvroSchema> m_schemas;
106 std::shared_ptr<SharedStatus> m_status;
109 class AvroDatum final {
111 AvroDatum() : m_schema(AvroSchema::NullSchema) {}
112 explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {}
114 void Fill(AvroStreamReader& reader,
const Core::Context& context);
115 void Fill(AvroStreamReader::ReaderPos& data);
117 const AvroSchema& Schema()
const {
return m_schema; }
119 template <
class T> T Value()
const;
122 const uint8_t* Data =
nullptr;
128 AvroStreamReader::ReaderPos m_data;
131 using AvroMap = std::map<std::string, AvroDatum>;
133 class AvroRecord final {
135 bool HasField(
const std::string& key)
const {
return FindField(key) != m_keys->size(); }
136 const AvroDatum& Field(
const std::string& key)
const {
return m_values.at(FindField(key)); }
137 AvroDatum& Field(
const std::string& key) {
return m_values.at(FindField(key)); }
138 const AvroDatum& FieldAt(
size_t i)
const {
return m_values.at(i); }
139 AvroDatum& FieldAt(
size_t i) {
return m_values.at(i); }
142 size_t FindField(
const std::string& key)
const
144 auto i = find(m_keys->begin(), m_keys->end(), key);
145 return i - m_keys->begin();
147 const std::vector<std::string>* m_keys =
nullptr;
148 std::vector<AvroDatum> m_values;
150 friend class AvroDatum;
153 class AvroObjectContainerReader final {
155 explicit AvroObjectContainerReader(Core::IO::BodyStream& stream);
157 bool End()
const {
return m_eof; }
160 AvroDatum Next(
const Core::Context& context) {
return NextImpl(m_objectSchema.get(), context); }
163 AvroDatum NextImpl(
const AvroSchema* schema,
const Core::Context& context);
166 std::unique_ptr<AvroStreamReader> m_reader;
167 std::unique_ptr<AvroSchema> m_objectSchema;
168 std::string m_syncMarker;
169 int64_t m_remainingObjectInCurrentBlock = 0;
173 class AvroStreamParser final :
public Core::IO::BodyStream {
175 explicit AvroStreamParser(
176 std::unique_ptr<Azure::Core::IO::BodyStream> inner,
177 std::function<
void(int64_t, int64_t)> progressCallback,
178 std::function<
void(BlobQueryError)> errorCallback)
179 : m_inner(std::move(inner)), m_parser(*m_inner),
180 m_progressCallback(std::move(progressCallback)), m_errorCallback(std::move(errorCallback))
184 int64_t Length()
const override {
return -1; }
185 void Rewind()
override { this->m_inner->Rewind(); }
188 size_t OnRead(uint8_t* buffer,
size_t count,
const Azure::Core::Context& context)
override;
191 std::unique_ptr<Azure::Core::IO::BodyStream> m_inner;
192 AvroObjectContainerReader m_parser;
193 std::function<void(int64_t, int64_t)> m_progressCallback;
194 std::function<void(BlobQueryError)> m_errorCallback;
195 AvroDatum::StringView m_parserBuffer;