Separation Mode
Design Objectives
Consdb2.0 is developed in Rust language, based on its security, high performance and community influence, provides users with an excellent time series database and forms a complete DBaas solution.
Time series database
- Extensive: Theoretically supported time series has no limit, completely solves the problem of time series expansion, and supports cross-river expansion.
- Calculate storage separation: Calculating nodes and storage nodes, can expand and shrink capacity independently and on a second scale.
- Storage performance and cost: High performance io stacks support hierarchical storage using cloud discs and object storage.
- The query engine supports vector queries.
- Supports multiple time series protocols to write and query, providing external component import data.
Original cloud
- Support cloud native, support the full use of cloud infrastructure to integrate into cloud native ecology.
- High availability, second-level failure recovery, support multi-cloud, cross-regional disaster preparedness.
- Native support multi-tenant, pay on schedule.
- The CDC log provides subscriptions and distributions to other nodes.
- Provide users with more configuration items to meet multiple-scenario complex requirements for public cloud users.
- Cloud side synergizes to provide the ability to fuse side ends with public clouds.
- Blend the OLAP / CloudA data ecosystem on the cloud.
In the process of redesigning the time series database, we solve a series of problems faced by the current time series database as much as possible, form a complete set of time series data solutions and time series ecosystem and provide DBaas services in public clouds.
We will have the elaboration from following aspects.
- Data replication and consensus
- Meta cluster
- SQL engine
- tskv index and data storage
Data Replication and Consensus
The fragment rule of CnosDB 2.0 is based on Time-range. It uses the fragmentation rule of DB + Time_range to place the data in the corresponding Bucket. Bucket is a virtual logic unit. Each Bucket consists of the following main properties. Bucket creates multiple fragments based on user configurations, dissipating data (suppose data fragment Shad Num is 1).> 「db, shardid, time_range, create_time, end_time, List\<Vnode>」
Vnode is a virtual running unit and is distributed to a specific Node. Each Vnode is a separate LSM Tree. Its corresponding tsfamily structure is a separate running unit.
Replicaset
The high availability of data can be maintained through data replica set. Each db has its own replica group representing the number of data redundants. A set of Vnotes within the same bucket forms a replica group with the same data and inverted index information.
Place Rule
To address the possibility of concurrent failures, the meta node may need to ensure that data copies are located on devices that use different nodes, racks, power sources, controllers and physical locations, when creating bucket. Considering that different tenants will access data at different region, Vnote should be dispatched and discharged by the way of optimal cost.
Data Separation Strategy
Data from different tenants on Node are physically segmented.
/User/db/bucket/replicaset_id/vnode_id
Data Consensus Based on Quorum Mechanism
The Cnosdb2.0 is implemented as a system with final consistency.
The module using the Quorum mechanism to make data consensuss and handling read or write requests is called codenatoor.
Meta information cache, interact with meta nodes
According to user, db, Timemange, get Vnote information, maintain a cache locally and pull VodeList from the remote without a local hit. Provide a trait of Meta Client.
Connection management
Manages connections with different tskvs for data reading/writing.
Agent operation for data reading/writing/deleting
Data is configured by users to support a variety of different consistency levels.
pub enum ConsistencyLevel {
/// allows for hinted handoff, potentially no write happened yet.
Any,
/// at least one data node acknowledged a write/read.
One,
/// a quorum of data nodes to acknowledge a write/read.
Quorum,
/// requires all data nodes to acknowledge a write/read.
All,
}Hinted handoff
Add under the scenario of a temporary failure of the target node to provide the Hinted handoff function of the continator node, which is persistently saved in the Hinted handoff queue of the node, until the copy node fails and then copied and recovered from the Hinted handoff queue.
Data are written
When a write request is received, the cordinator determines the physical node (note) where the data to be stored, based on the partition policy and the corresponding placement rules (place-rule). As long as at least W nodes return to success, the writing operation is considered successful.
Writer Process
Data Reading
When a read request is received, the cordinator determines that the physical node (note) where the data to be stored and requires this key corresponding data based on the partition policy and the corresponding placement rules (place-rule), and at present we do not perform the function of read repair (read repair) to initiate only one reading request. In the case of delay in reading, initiate a second reading request.
Update of Conflicts
- After data creates conflict in a time series scenario, use consistency hash to be replaced by the first copy (replaica) as a confirmation point
- At the same time, the last-write-win strategy is used to resolve conflicts.
Meta Cluster
Maintain a strong consistency meta cluster through raft. Meta cluster api serves externally, while nodes also subscribe to updates to meta information. All metadata updates are updated through the meta-data cluster.
- Database catalog information, DDL operation.
- The node probe/node registration, as well as node load information statistics, is the basis for the read and write selected by coordinator.
- Rent and sub-user information and permissions are relevant.
- Data routing information, the routing information corresponding to vnodeList corresponding to denant / db / bucket / replicaset.
- Provides the functionality of distributed locks and watch change notifications.
We adopt a strong consistency meta cluster and realize corresponding optimization. The specific reasons are as follows:
- In practice, metadata in our cluster is usually controlled on a smaller scale and without extensible requirements.
- Engineering practice is relatively simple and is conducive to rapid iteration.
- Make cache and localized storage for access to frequently accessible data, optimize.
- After storage locally, subscribe to schema version changes from the meta cluster to relieve the pressure of meta cluster reading.
- Meta clusters share the leveler pressure and provide the Follower / Read scheme. Reading performance is optimized.
SQL Engine
We used DataFusion as the query engine. DataFusion is an extensible query execution framework, written with Rust, used Apache Arrow As its memory format. DataFusion supports SQL and DataFrame API for building logical query schemes, as well as query optimizers and execution engines that can be executed in parallel with partition data sources using threads. It has the following advantages:
- High performance: Using the memory models of Rust and Arrow, it has high performance.
- Strong extensibility: Allows almost any point in its design to be extended and customized with a needle-specific use case.
- High quality: DataFusion and Arrow ecology are widely tested and can be used as production systems.
- Fusion of large data ecology: As part of the Apache Arrow ecosystem (Arrow, Flight, Parquet), it is better integrated with large data ecosystems.
By extending DataFusion data sources and providing custom SQL statements, the query process for data under distributed scenarios is as follows:
TSKV Index and Data Storage
tskv mainly undertakes data and index storage, manages all Vnodes on node, each Vnode is responsible for some of the data in a db. In Vnode, three modules mainly make up WAL, Index Engine and Data Engine.
Index Engine
Indexes used to store time series data are usually models that read more and write less, mainly quickly indexing and tagkey-based conditional filtering to filter out the right series.
The main functions are:
- Storage positive index
- Storage reverse index
- Caching catalog information
Common query statements:
SELECT xxx from table where tag1= value1 && tag2=value2 [and time > aaa and time \< bbb] [group by\order by\limit ....]
The design of index is mainly aimed at the where filtering conditions; used to reduce the search scale of data and speed up the query efficiency of data.
Support the following filtering conditions:
- Equal to; not equal to; such as: tag = value, tag! = Value
- More than; less than; such as: tag \< value
- Prefix matching; such as: tag = aaa_*
- Regular expressions; such as: tag = aaa*bbb
Indexes is built when the data is written. In time series database, each tag is indexed, and the corresponding value of multiple tags is combined into a searchkey. Although time series databases are writing more and reading less, the use of indexes when writing data is more read than build. Time series databases are often written to different time points of the same search, so each search's index information needs to be built only when it is first written, and if the search exists (reading operation), no longer indexed;
Storage structure
Based on the hash function, calculate HashID:
hash (SeriesKey) -> HashID
(24-bit integer, about 16 million); 2.HashID (uint64):HashID \< 40 | auto_increment_id -> SeresID
is obtained.- FieldID (uint64) is combined by SeriesID with TableFiledID (field has a number within the table for TableFiledID).
Conditions of limitation:
- The number of HashIDs is about 16 million, and hundreds of millions of single machine Series will lead to List lengthening drag-and-seeking.
- The 24th bits of FieldID are TableFiledID, and the lower 40 bits are the lower 40 bits of SeresID.
The TSM data file stores FieldID and corresponding Data information. The information about SeresKey is stored in the index file, and the following is about the index data organization.
Design of index data structure
- HashList:
HashID-> List \< (SeriesKey, SereesID) >
for SeriesKey to interexamine with SereesID- SereiesKey looks for the SereesID process:
Hash (SeriesKey) -> HashID
, getsList \<SerisKey, SereesID >
from HashList, and then traverses List for SeriesID. - SereesID looks for the SereesKey process, takes the 24-bit high of SeresID as HashID, and the search process is the same.
- SereiesKey looks for the SereesID process:
TagValue -> List \<SeriesID>
implements indexing capabilities for Tag, using tag query conditions filtering.- Query Conditions:
where tag = value
, get a list of SeresIDs based on TagValue, and further obtain FieldID loading data from TSM files. - Multiple query conditions intersect and or need to operate multiple
List\<SereesID\>
.
- Query Conditions:
- The TagValue sequence is required to store traverse access. Used
show tag value
query HashList structure requires one maintenance, inert loading in memory.HashID-> List \< (SeriesKey, SereesID) >
andTagValue-> List \<SeriesID>
are persistent.
- HashList:
Data Engine
Data used primarily to store time series data are usually scenes that write more and read less, using LSM models, mainly to write data quickly, while removing expired and deleted data through context. DataEngine is divided into the following modules:
WAL module
For the pre-log, the WAL applies the write operation to the WAL file on disk before memory is added to the disk before memory, which will be used to restore memory to a state consistent with the collapse. When a write request is received, wal_job first checks whether the current WAL file is full, if it is full, create a new one, and then start writing it in a certain format. Each req corresponds separately to a seq-no, seq-no increment to record how many batches have been written since it started. The wal_job thread returns this seq_no to the main thread. Each point of the same batch has the same seq_no in memory or written to TSM, which is processed for seq_no.
TimeSeriesFamily
TimeSeriesFamily, a storage unit for time-order data that saves metadata for data in corresponding memory and data in corresponding disks, typically abbreviation for tsfamily, and before we write data, we generate SeresID and FieldID based on the tag and Field of the data. Coordinator gets Bucket based on db and Timemange and gets TseriesFamilyID to write data to tsfamily based on hash (SeriesID) % shard_nums. The tsfamily members are as follows:
pub struct TseriesFamily {
tf_id: u32,
delta_mut_cache: Arc \<RwLock \<MemCache>>,
delta_immut_cache: Vec \<Arc \<RwLock \<MemCache>>>,
mut_cache: Arc \<RwLock \<MemCache>>,
immut_cache: Vec \<Arc \<RwLock \<MemCache>>>,
super_version: Arc \<SuperVersion>,
super_version_id: AtomicU64,
version: Arc \<RwLock \<Version>>,
opts: Arc \<TseriesFamOpt>,
seq_no: u64,
immut_ts_min: i64,
mut_ts_max: i64,
}tf_id
: tthe identifier of tsfamily, each tsfamily has the only tf_id.mut-cache
: For the latest data written in a cacheimmut-cache
: When the mut-cache is full, turn toimmut-chache
,immut-cache
flash to disk to generate TSM files.super-version
: Snapshot data from the currentmut-cache
andimmut-cache
of tsfimily.version
: Maintains snapshots of disk data in the current tsfaimily.Recover and Summary
Summmarry is a metadata file generated by changes in the version of the TSM file, which stores the sample file. The system file stores version-change metadata version_edit for outage recovery of
version_set
metadata. The node runs for a long time to generate larger summary files, and we regularly integrate the summary file to reduce the time of outage recovery.tskv first performs the recover function when creating:
- Gets the summary structure from the sample file.
- According to the
last_seq
of ctx of the schema structure, know which batch has been filed by flush - According to the wal file and
last_seq
, the base that is not rewritten into memory by the flush - Restore
version_set
based on the summary file
Flush
When the
immut-cache
capacity in tsfamily reaches a certain extent, the flash starts after the execution of the write operation, when it is found that theimmut-cache
is full, pack it into aflash_request
, which is received by theflash_job
thread after processing.- Remove data from the
flash-request
and create aflash_task
based on data, executed - According to
TseriesFamilyID
, FileID creates a TSM file that writes data to the TSM file - According to file information, the application metadata corresponds to the
Levels_info
of the version - Generate versioned it based on modifications to version and
seq-no
, TseriesFamilyID, etc - Send all generated
version edit
to thesummary_task_sender
created together at the time of creation of tskv, and the thread receives the request and starts processing, and writes theversion_edit
to the summary file.
- Remove data from the
compaction
We use the class LSMtree method to sort data. Typically, data from time series databases are written in chronological manner. But IoT has scenarios that make up data, leading to time stamps. In addition, it is difficult to ensure the order of writing for all users due to network delays in public cloud scenarios. In the face of multiple complex write-in scenarios, we need to consider a variety of complex scenarios when performing data.
The purpose of the operation is:
- Aggregate small tsm files to generate larger tsm files.
- Clean up files that have expired or marked to delete.
- Reduce reading magnification and maintain the metadata of
level_info
in our current version.
level_range compaction
- Typically, time series databases are written in order to respond to disorderly data, we add delta files. The data of Delta is brushed to the L0 layer.
- From L1 to L3, The data of
LevelInfo
are classified by time. Each layer has a fixed time range and does not overlap, and the data in memcache has a fixed timerange. Each layer of time is dynamically updated when it works or flashes. - Each newly written TSM file has the latest time range of the layer. That is,
TimeRange ( ts_min, ts_max)
,ts_max
is the largest in the time range held by file id largest TSM file in the L0 layer. - The pick process of the compact creates a virtual
time_window
.time_window
selects the appropriate TSM file in this layer for compaction to the next floor, while updating the data of this layerLevel_info
. Update TSMin inLevel_info
to maximum timestamp oftime_window
, the time range of this layer goes forward. The newly generated TSM file is placed on the next floor and ts_max of the next layer is propelled to the maximum value oftime_window
. - At the beginning of L3, the TSM file is divided by directory by table; and the same table TSM file is placed together. Supports the generation of the parquet file and is graded on S3.
time_window compaction
Window-based components are performed in different lev_lange modes, from immut_cache flash to disk, generating different TSM files into the corresponding windows based on the time range of TSM, and windows are created dynamically over time. Each windows is responsible for writing for some time.
There are some discrete data tsm file blocks within windows that need to be merged to generate larger file blocks. The windows internal maintains a list of metadata about files. Compared with the mode of integration with Level_range, the performance of time_window reduces the amplification of writing.
data_engine data stream
Other System Design
Concession of tenants
query layer
In DataFusion, the catalog isolation relationship is divided into
catalog/schema/table
. We use this isolation relationship, which is separated between tenants astenant (namespace) / database / table
.- Table corresponds to a specific table in a specific database that provides a specific table schema definition implementation TableProvider
- Database corresponds to a dataabase, which manages multiple tables under a specific database.
- Namespace corresponds to Catalog. Each tenant occupies only one catalog, and the db seen in different tenants is different, and different tenants can use the same Database name. When the user logs in, take TenantID in the session by default to see his namespace, which means namespace has a soft isolation effect.
tskv layer
The directory segmentation policy mentioned in the above introduction:
/User/db/book/replicationset_id/vnode_id/tskv
is an instance on each Node node. Save all Vnote information on the current Node. Each Vnode saves the data under a separate directory. Clean up the data based on the configuration db retion policy. At the same time, we can easily carry out the data directory size statistics, the tenant is billed.