Cheap TimeSeries Database#

这是一个将 DynamoDB 作为 TimeSeries 数据库使用的便宜的替代方案.

test.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4我有 1 台服务器.
  5
  6Pricing Fact:
  7
  8- 1 WCU = 1KB.
  9- 1 RCU = 4KB. 如果是 eventual consistent read 则减半.
 10- 1M WCU = $1.25
 11- 1M RCU = $0.25
 12- $0.25 per GM/Month (对于我们的 App 来说可以忽略不计)
 13
 14Workload Fact:
 15
 16- 平均 30 秒测量一次, 1 天测量 2880 次.
 17- 每次测量的数据大小大约在 0.25 KB, 但是要消耗 1 WCU.
 18- 平均 15 分钟查询一次最近 3 小时的服务器的测量数据, 1 天查询 96 次.
 19- 3 小时的数据大约是 3 * 60 * 2 = 360 个 item, 一共 90KB, 也就是消耗 90 WCU.
 20
 21Cost:
 22
 23- Write: 1 天测量 2880 次乘以每次消耗 1 WCU, 共计 2880 WCU. 也就是 1.25 * 2880 / 1000000 = $0.0036/天, 等于 $0.108/月.
 24- Read: 1 天查询 96 次乘以每次消耗 90 WCU, 共计 8640 WCU. 也就是 0.25 * 8640 / 1000000 / 2 = $0.00108/天, 等于 $0.0648/月.
 25- 也就是为了测量一台服务器的使用情况需要花费 0.1656 美元/月. 6 台服务器也就是大约一个月 $1.
 26    一个 RDS db.t4g.small 空转 5 小时大约就是 0.16 美元, 所以是有必要测量的.
 27"""
 28
 29import polars as pl
 30import pynamodb_mate.api as pm
 31from datetime import datetime
 32
 33class Measurement(pm.Model):
 34    class Meta:
 35        table_name = "measurement"
 36        region = "us-east-1"
 37        billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
 38
 39    key = pm.UnicodeAttribute(hash_key=True)
 40    ts = pm.UTCDateTimeAttribute(range_key=True)
 41    cpu_usage = pm.NumberAttribute()
 42    memory_usage = pm.NumberAttribute()
 43
 44    @classmethod
 45    def query_between(
 46        cls,
 47        key: str,
 48        start_time: datetime,
 49        end_time: datetime,
 50    ):
 51        return cls.iter_query(
 52            hash_key=key,
 53            range_key_condition=cls.ts.between(start_time, end_time),
 54        )
 55
 56    @classmethod
 57    def list_to_df(
 58        cls,
 59        items: list,
 60    ) -> pl.DataFrame:
 61        print(cls.get_attributes())
 62        cols = list(cls.get_attributes())
 63        cols.remove("key")
 64        cols.remove("ts")
 65        cols = ["key", "ts"] + cols
 66        df = pl.DataFrame(
 67            [item.attribute_values for item in items],
 68            schema=cols,
 69        )
 70        return df
 71
 72
 73if __name__ == "__main__":
 74    import random
 75
 76    import moto
 77    from datetime import timedelta, timezone
 78    from rich import print as rprint
 79    from boto_session_manager import BotoSesManager
 80
 81    def get_utc_now():
 82        return datetime.utcnow().replace(tzinfo=timezone.utc)
 83
 84    mock_aws = moto.mock_aws()
 85    mock_aws.start()
 86
 87    bsm = BotoSesManager(region_name="us-east-1")
 88    print(f"{bsm.aws_account_id = }")
 89    Measurement.create_table(wait=True)
 90
 91    start_time = get_utc_now()
 92    ec2_inst_id = "i-1234567890abcdef0"
 93    with Measurement.batch_write() as batch:
 94        for i in range(10):
 95            measurement = Measurement(
 96                key=ec2_inst_id,
 97                ts=start_time + timedelta(seconds=i),
 98                cpu_usage=random.randint(5, 95),
 99                memory_usage=random.randint(5, 95),
100            )
101            batch.save(measurement)
102
103    measurements = Measurement.query_between(
104        ec2_inst_id,
105        start_time=start_time,
106        end_time=start_time + timedelta(seconds=10),
107    ).all()
108
109    rprint([item.attribute_values for item in measurements])
110
111    df = Measurement.list_to_df(items=measurements)
112
113    mock_aws.stop()