Loading [MathJax]/jax/input/TeX/config.js
azure-storage-common
All Classes Functions Variables Pages
concurrent_transfer.hpp
1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4#pragma once
5
6#include <algorithm>
7#include <atomic>
8#include <cstdlib>
9#include <functional>
10#include <future>
11#include <stdexcept>
12#include <vector>
13
14namespace Azure { namespace Storage { namespace _internal {
15
16 inline void ConcurrentTransfer(
17 int64_t offset,
18 int64_t length,
19 int64_t chunkSize,
20 int concurrency,
21 // offset, length, chunk ID, number of chunks
22 std::function<void(int64_t, int64_t, int64_t, int64_t)> transferFunc)
23 {
24 std::atomic<int> numWorkingThreads{concurrency};
25 std::atomic<int> nextChunkId{0};
26 std::atomic<bool> failed{false};
27
28 const auto numChunks = (length + chunkSize - 1) / chunkSize;
29
30 auto threadFunc = [&]() {
31 while (true)
32 {
33 int chunkId = nextChunkId.fetch_add(1);
34 if (chunkId >= numChunks || failed)
35 {
36 break;
37 }
38 int64_t chunkOffset = offset + chunkSize * chunkId;
39 int64_t chunkLength = (std::min)(length - chunkSize * chunkId, chunkSize);
40 try
41 {
42 transferFunc(chunkOffset, chunkLength, chunkId, numChunks);
43 }
44 catch (const std::exception&)
45 {
46 if (failed.exchange(true) == false)
47 {
48 numWorkingThreads.fetch_sub(1);
49 throw;
50 }
51 }
52 }
53 numWorkingThreads.fetch_sub(1);
54 };
55
56 std::vector<std::future<void>> threadHandles;
57 for (int i = 0; i < std::min<int64_t>(concurrency, numChunks) - 1; ++i)
58 {
59 threadHandles.emplace_back(std::async(std::launch::async, threadFunc));
60 }
61 threadFunc();
62 for (auto& handle : threadHandles)
63 {
64 handle.get();
65 }
66 }
67
68}}} // namespace Azure::Storage::_internal