14namespace Azure {
namespace Storage {
namespace _internal {
16 inline void ConcurrentTransfer(
22 std::function<
void(int64_t, int64_t, int64_t, int64_t)> transferFunc)
24 std::atomic<int> numWorkingThreads{concurrency};
25 std::atomic<int> nextChunkId{0};
26 std::atomic<bool> failed{
false};
28 const auto numChunks = (length + chunkSize - 1) / chunkSize;
30 auto threadFunc = [&]() {
33 int chunkId = nextChunkId.fetch_add(1);
34 if (chunkId >= numChunks || failed)
38 int64_t chunkOffset = offset + chunkSize * chunkId;
39 int64_t chunkLength = (std::min)(length - chunkSize * chunkId, chunkSize);
42 transferFunc(chunkOffset, chunkLength, chunkId, numChunks);
44 catch (
const std::exception&)
46 if (failed.exchange(
true) ==
false)
48 numWorkingThreads.fetch_sub(1);
53 numWorkingThreads.fetch_sub(1);
56 std::vector<std::future<void>> threadHandles;
57 for (
int i = 0; i < std::min<int64_t>(concurrency, numChunks) - 1; ++i)
59 threadHandles.emplace_back(std::async(std::launch::async, threadFunc));
62 for (
auto& handle : threadHandles)