145 lines
5.4 KiB
C
145 lines
5.4 KiB
C
|
/*------------------------------------------------------------------------------*
|
||
|
* Architecture & Implementation of DBMS *
|
||
|
*------------------------------------------------------------------------------*
|
||
|
* Copyright 2022 Databases and Information Systems Group TU Dortmund *
|
||
|
* Visit us at *
|
||
|
* http://dbis.cs.tu-dortmund.de/cms/en/home/ *
|
||
|
* *
|
||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS *
|
||
|
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
|
||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
|
||
|
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR *
|
||
|
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, *
|
||
|
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR *
|
||
|
* OTHER DEALINGS IN THE SOFTWARE. *
|
||
|
* *
|
||
|
* Authors: *
|
||
|
* Maximilian Berens <maximilian.berens@tu-dortmund.de> *
|
||
|
* Roland Kühn <roland.kuehn@cs.tu-dortmund.de> *
|
||
|
* Jan Mühlig <jan.muehlig@tu-dortmund.de> *
|
||
|
*------------------------------------------------------------------------------*
|
||
|
*/
|
||
|
|
||
|
#pragma once
|
||
|
#include "unary_operator.h"
|
||
|
#include <cstddef>
|
||
|
#include <execution/aggregator.h>
|
||
|
#include <string_view>
|
||
|
#include <table/memory_table.h>
|
||
|
#include <unordered_map>
|
||
|
#include <utility>
|
||
|
#include <vector>
|
||
|
|
||
|
namespace beedb::execution
|
||
|
{
|
||
|
class GroupByKey
|
||
|
{
|
||
|
public:
|
||
|
GroupByKey(const std::vector<std::pair<std::uint16_t, std::uint16_t>> &grouped_columns,
|
||
|
const std::uint16_t grouped_row_size, const table::Tuple &tuple)
|
||
|
{
|
||
|
_data = new std::byte[grouped_row_size];
|
||
|
auto current_index = 0u;
|
||
|
for (const auto &[column_id, _] : grouped_columns)
|
||
|
{
|
||
|
const auto &column = tuple.schema().column(column_id);
|
||
|
const auto size = column.type().size();
|
||
|
std::memcpy(&_data[current_index], &tuple.data()[tuple.schema().offset(column_id)], size);
|
||
|
current_index += size;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
GroupByKey(const GroupByKey &) = delete;
|
||
|
|
||
|
GroupByKey(GroupByKey &&other) noexcept : _data(other._data)
|
||
|
{
|
||
|
other._data = nullptr;
|
||
|
}
|
||
|
|
||
|
~GroupByKey()
|
||
|
{
|
||
|
delete[] _data;
|
||
|
}
|
||
|
|
||
|
bool operator==(const GroupByKey &other) const
|
||
|
{
|
||
|
return std::strcmp(reinterpret_cast<const char *>(_data), reinterpret_cast<const char *>(other._data)) == 0;
|
||
|
}
|
||
|
|
||
|
explicit operator std::string_view() const
|
||
|
{
|
||
|
return std::string_view{reinterpret_cast<const char *>(this->_data)};
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
std::byte *_data;
|
||
|
};
|
||
|
|
||
|
class AggregateOperator : public UnaryOperator
|
||
|
{
|
||
|
public:
|
||
|
AggregateOperator(concurrency::Transaction *transaction, table::Schema &&schema,
|
||
|
std::vector<std::pair<std::uint16_t, std::uint16_t>> &&groups,
|
||
|
std::unordered_map<expression::Term, std::unique_ptr<AggregatorInterface>> &&aggregators);
|
||
|
~AggregateOperator() override = default;
|
||
|
|
||
|
void open() override;
|
||
|
util::optional<table::Tuple> next() override;
|
||
|
void close() override;
|
||
|
|
||
|
[[nodiscard]] const table::Schema &schema() const override
|
||
|
{
|
||
|
return _schema;
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
// The output schema, generated by this operator.
|
||
|
const table::Schema _schema;
|
||
|
|
||
|
// List of [output tuple schema index, aggregator]. The aggregator will produce
|
||
|
// a value for the tuple at the given index.
|
||
|
std::vector<std::pair<std::uint16_t, std::unique_ptr<AggregatorInterface>>> _aggregators;
|
||
|
|
||
|
// Map of child attribute index to this attribute index.
|
||
|
std::vector<std::pair<std::uint16_t, std::uint16_t>> _group_attribute_map;
|
||
|
|
||
|
// Because next() will be called multiple times, we have to
|
||
|
// remember if we have grouped, yet.
|
||
|
bool _has_grouped = false;
|
||
|
|
||
|
// Because next() will be called multiple times, we have to
|
||
|
// remember if we have aggregated, yet.
|
||
|
bool _has_aggregated = false;
|
||
|
|
||
|
// List of grouped tiles. May be empty, even if we have grouped.
|
||
|
std::vector<table::MemoryTable> _tiles;
|
||
|
|
||
|
// Because next() will be called multiple times, we have to remember
|
||
|
// the index of the tile we want to aggregate.
|
||
|
std::size_t _current_tile = 0u;
|
||
|
|
||
|
/**
|
||
|
* Groups the tuples returned by the given child into "tiles", which are
|
||
|
* memory tables, containing all tuples with the identical group key.
|
||
|
*
|
||
|
* @param group_attribute_map List of indices to group.
|
||
|
* @param source Child that produces tuples.
|
||
|
* @return List of tiles.
|
||
|
*/
|
||
|
static std::vector<table::MemoryTable> group(
|
||
|
const std::vector<std::pair<std::uint16_t, std::uint16_t>> &group_attribute_map,
|
||
|
const std::unique_ptr<OperatorInterface> &source);
|
||
|
};
|
||
|
} // namespace beedb::execution
|
||
|
|
||
|
namespace std
|
||
|
{
|
||
|
template <> struct hash<beedb::execution::GroupByKey>
|
||
|
{
|
||
|
public:
|
||
|
std::size_t operator()(const beedb::execution::GroupByKey &key) const
|
||
|
{
|
||
|
return std::hash<std::string_view>()(static_cast<std::string_view>(key));
|
||
|
}
|
||
|
};
|
||
|
} // namespace std
|