Skip to content

Commit 7186aa5

Browse files
committed
separate different streaming join
1 parent 16a408e commit 7186aa5

36 files changed

+1248
-415
lines changed

src/Core/BlockWithShard.h

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/Core/DataBlockWithShard.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include <Core/Block.h>
4+
#include <Core/LightChunk.h>
5+
6+
namespace DB
7+
{
8+
template <typename DataBlock>
9+
struct DataBlockWithShard
10+
{
11+
DataBlock block;
12+
int32_t shard;
13+
14+
DataBlockWithShard(DataBlock && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { }
15+
};
16+
17+
using BlockWithShard = DataBlockWithShard<Block>;
18+
using BlocksWithShard = std::vector<BlockWithShard>;
19+
20+
using LightChunkWithShard = DataBlockWithShard<LightChunk>;
21+
using LightChunksWithShard = std::vector<LightChunkWithShard>;
22+
}
23+

src/Core/LightChunk.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ struct LightChunk
2727
void concat(const LightChunk & other)
2828
{
2929
auto added_rows = other.rows();
30+
if (added_rows <= 0)
31+
return;
32+
3033
assert(columns() == other.columns());
3134
for (size_t c = 0; auto & col : data)
3235
{
@@ -35,9 +38,21 @@ struct LightChunk
3538
}
3639
}
3740

41+
LightChunk cloneEmpty() const
42+
{
43+
LightChunk res;
44+
res.data.reserve(data.size());
45+
46+
for (const auto & elem : data)
47+
res.data.emplace_back(elem->cloneEmpty());
48+
49+
return res;
50+
}
51+
3852
size_t rows() const noexcept { return data.empty() ? 0 : data[0]->size(); }
3953
size_t columns() const noexcept { return data.size(); }
4054

55+
Columns & getColumns() noexcept { return data; }
4156
const Columns & getColumns() const noexcept { return data; }
4257
Columns detachColumns() noexcept { return std::move(data); }
4358

@@ -88,7 +103,9 @@ struct LightChunkWithTimestamp
88103
LightChunkWithTimestamp() = default;
89104
LightChunkWithTimestamp(Columns && data_) : chunk(std::move(data_)) { }
90105
LightChunkWithTimestamp(Chunk && chunk_, Int64 min_ts, Int64 max_ts)
91-
: chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts) { }
106+
: chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts)
107+
{
108+
}
92109
LightChunkWithTimestamp(const Block & block)
93110
: chunk(block), min_timestamp(block.minTimestamp()), max_timestamp(block.maxTimestamp()) { }
94111

@@ -122,4 +139,4 @@ struct LightChunkWithTimestamp
122139
Int64 maxTimestamp() const noexcept { return max_timestamp; }
123140
};
124141

125-
}
142+
}

src/Interpreters/ExpressionAnalyzer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2331,7 +2331,7 @@ std::shared_ptr<IJoin> SelectQueryExpressionAnalyzer::chooseJoinAlgorithmStreami
23312331
return std::make_shared<Streaming::ConcurrentHashJoin>(
23322332
analyzed_join, max_threads, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
23332333
else
2334-
return std::make_shared<Streaming::HashJoin>(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
2334+
return Streaming::HashJoin::create(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
23352335
}
23362336
/// proton : ends
23372337

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
class AsofHashJoin final : public HashJoin
10+
{
11+
public:
12+
using HashJoin::HashJoin;
13+
HashJoinType type() const override { return HashJoinType::Asof; }
14+
};
15+
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
class BidirectionalAllHashJoin final : public HashJoin
10+
{
11+
public:
12+
using HashJoin::HashJoin;
13+
HashJoinType type() const override { return HashJoinType::BidirectionalAll; }
14+
};
15+
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
class BidirectionalChangelogHashJoin final : public HashJoin
10+
{
11+
public:
12+
using HashJoin::HashJoin;
13+
HashJoinType type() const override { return HashJoinType::BidirectionalChangelog; }
14+
};
15+
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
class BidirectionalRangeHashJoin final : public HashJoin
10+
{
11+
public:
12+
using HashJoin::HashJoin;
13+
HashJoinType type() const override { return HashJoinType::BidirectionalRange; }
14+
};
15+
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#pragma once
2+
3+
#include <Interpreters/Streaming/HashJoin.h>
4+
5+
namespace DB
6+
{
7+
namespace Streaming
8+
{
9+
class ChangelogHashJoin final : public HashJoin
10+
{
11+
public:
12+
using HashJoin::HashJoin;
13+
HashJoinType type() const override { return HashJoinType::Changelog; }
14+
};
15+
16+
}
17+
}

0 commit comments

Comments
 (0)