Loading [MathJax]/extensions/tex2jax.js
azure-storage-blobs
All Classes Functions Variables Pages
avro_parser.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#pragma once
5
6#include "azure/storage/blobs/blob_options.hpp"
7
8#include <azure/core/io/body_stream.hpp>
9
10#include <map>
11#include <memory>
12#include <type_traits>
13
14namespace Azure { namespace Storage { namespace Blobs { namespace _detail {
15 enum class AvroDatumType
16 {
17 String,
18 Bytes,
19 Int,
20 Long,
21 Float,
22 Double,
23 Bool,
24 Null,
25 Record,
26 Enum,
27 Array,
28 Map,
29 Union,
30 Fixed,
31 };
32
33 class AvroStreamReader final {
34 public:
35 // position of a vector that lives through vector resizing
36 struct ReaderPos final
37 {
38 const std::vector<uint8_t>* BufferPtr = nullptr;
39 size_t Offset = 0;
40 };
41 explicit AvroStreamReader(Core::IO::BodyStream& stream)
42 : m_stream(&stream), m_pos{&m_streambuffer, 0}
43 {
44 }
45 AvroStreamReader(const AvroStreamReader&) = delete;
46 AvroStreamReader& operator=(const AvroStreamReader&) = delete;
47
48 int64_t ParseInt(const Core::Context& context);
49 void Advance(size_t n, const Core::Context& context);
50 // Read at least n bytes from m_stream and append data to m_streambuffer. Return number of bytes
51 // available in m_streambuffer;
52 size_t Preload(size_t n, const Core::Context& context);
53 size_t TryPreload(size_t n, const Core::Context& context);
54 // discards data that's before m_pos
55 void Discard();
56
57 private:
58 size_t AvailableBytes() const { return m_streambuffer.size() - m_pos.Offset; }
59
60 private:
61 Core::IO::BodyStream* m_stream;
62 std::vector<uint8_t> m_streambuffer;
63 ReaderPos m_pos;
64
65 friend class AvroDatum;
66 };
67
68 class AvroSchema final {
69 public:
70 static const AvroSchema StringSchema;
71 static const AvroSchema BytesSchema;
72 static const AvroSchema IntSchema;
73 static const AvroSchema LongSchema;
74 static const AvroSchema FloatSchema;
75 static const AvroSchema DoubleSchema;
76 static const AvroSchema BoolSchema;
77 static const AvroSchema NullSchema;
78 static AvroSchema RecordSchema(
79 std::string name,
80 const std::vector<std::pair<std::string, AvroSchema>>& fieldsSchema);
81 static AvroSchema ArraySchema(AvroSchema elementSchema);
82 static AvroSchema MapSchema(AvroSchema elementSchema);
83 static AvroSchema UnionSchema(std::vector<AvroSchema> schemas);
84 static AvroSchema FixedSchema(std::string name, int64_t size);
85
86 const std::string& Name() const { return m_name; }
87 AvroDatumType Type() const { return m_type; }
88 const std::vector<std::string>& FieldNames() const { return m_status->m_keys; }
89 AvroSchema ItemSchema() const { return m_status->m_schemas[0]; }
90 const std::vector<AvroSchema>& FieldSchemas() const { return m_status->m_schemas; }
91 size_t Size() const { return static_cast<size_t>(m_status->m_size); }
92
93 private:
94 explicit AvroSchema(AvroDatumType type) : m_type(type) {}
95
96 private:
97 AvroDatumType m_type;
98 std::string m_name;
99
100 struct SharedStatus
101 {
102 std::vector<std::string> m_keys;
103 std::vector<AvroSchema> m_schemas;
104 int64_t m_size = 0;
105 };
106 std::shared_ptr<SharedStatus> m_status;
107 };
108
109 class AvroDatum final {
110 public:
111 AvroDatum() : m_schema(AvroSchema::NullSchema) {}
112 explicit AvroDatum(AvroSchema schema) : m_schema(std::move(schema)) {}
113
114 void Fill(AvroStreamReader& reader, const Core::Context& context);
115 void Fill(AvroStreamReader::ReaderPos& data);
116
117 const AvroSchema& Schema() const { return m_schema; }
118
119 template <class T> T Value() const;
120 struct StringView
121 {
122 const uint8_t* Data = nullptr;
123 size_t Length = 0;
124 };
125
126 private:
127 AvroSchema m_schema;
128 AvroStreamReader::ReaderPos m_data;
129 };
130
131 using AvroMap = std::map<std::string, AvroDatum>;
132
133 class AvroRecord final {
134 public:
135 bool HasField(const std::string& key) const { return FindField(key) != m_keys->size(); }
136 const AvroDatum& Field(const std::string& key) const { return m_values.at(FindField(key)); }
137 AvroDatum& Field(const std::string& key) { return m_values.at(FindField(key)); }
138 const AvroDatum& FieldAt(size_t i) const { return m_values.at(i); }
139 AvroDatum& FieldAt(size_t i) { return m_values.at(i); }
140
141 private:
142 size_t FindField(const std::string& key) const
143 {
144 auto i = find(m_keys->begin(), m_keys->end(), key);
145 return i - m_keys->begin();
146 }
147 const std::vector<std::string>* m_keys = nullptr;
148 std::vector<AvroDatum> m_values;
149
150 friend class AvroDatum;
151 };
152
153 class AvroObjectContainerReader final {
154 public:
155 explicit AvroObjectContainerReader(Core::IO::BodyStream& stream);
156
157 bool End() const { return m_eof; }
158 // Calling Next() will invalidates the previous AvroDatum returned by this function and all
159 // AvroDatums propagated from there.
160 AvroDatum Next(const Core::Context& context) { return NextImpl(m_objectSchema.get(), context); }
161
162 private:
163 AvroDatum NextImpl(const AvroSchema* schema, const Core::Context& context);
164
165 private:
166 std::unique_ptr<AvroStreamReader> m_reader;
167 std::unique_ptr<AvroSchema> m_objectSchema;
168 std::string m_syncMarker;
169 int64_t m_remainingObjectInCurrentBlock = 0;
170 bool m_eof = false;
171 };
172
173 class AvroStreamParser final : public Core::IO::BodyStream {
174 public:
175 explicit AvroStreamParser(
176 std::unique_ptr<Azure::Core::IO::BodyStream> inner,
177 std::function<void(int64_t, int64_t)> progressCallback,
178 std::function<void(BlobQueryError)> errorCallback)
179 : m_inner(std::move(inner)), m_parser(*m_inner),
180 m_progressCallback(std::move(progressCallback)), m_errorCallback(std::move(errorCallback))
181 {
182 }
183
184 int64_t Length() const override { return -1; }
185 void Rewind() override { this->m_inner->Rewind(); }
186
187 private:
188 size_t OnRead(uint8_t* buffer, size_t count, const Azure::Core::Context& context) override;
189
190 private:
191 std::unique_ptr<Azure::Core::IO::BodyStream> m_inner;
192 AvroObjectContainerReader m_parser;
193 std::function<void(int64_t, int64_t)> m_progressCallback;
194 std::function<void(BlobQueryError)> m_errorCallback;
195 AvroDatum::StringView m_parserBuffer;
196 };
197
198}}}} // namespace Azure::Storage::Blobs::_detail