diff --git a/Makefile b/Makefile index dee65ce0..fa4a7a3f 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ default: tidy fmt lint build build: tidb pocket tpcc ledger txn-rand-pessimistic on-dup sqllogic block-writer \ region-available deadlock-detector crud bank bank2 abtest cdc-pocket tiflash-pocket vbank \ - read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read + read-stress rawkv-linearizability tiflash-abtest tiflash-cdc follower-read write-stress tidb: $(GOBUILD) $(GOMOD) -o bin/chaos-tidb cmd/tidb/main.go @@ -97,6 +97,9 @@ tiflash-cdc: follower-read: $(GOBUILD) $(GOMOD) -o bin/follower-read cmd/follower-read/*.go +write-stress: + $(GOBUILD) $(GOMOD) -o bin/write-stress cmd/write-stress/*.go + fmt: groupimports go fmt ./... diff --git a/cmd/write-stress/main.go b/cmd/write-stress/main.go new file mode 100644 index 00000000..da7cac18 --- /dev/null +++ b/cmd/write-stress/main.go @@ -0,0 +1,61 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + + // use mysql + _ "github.com/go-sql-driver/mysql" + + test_infra "github.com/pingcap/tipocket/pkg/test-infra" + writestress "github.com/pingcap/tipocket/tests/write-stress" + + "github.com/pingcap/tipocket/cmd/util" + "github.com/pingcap/tipocket/pkg/cluster" + "github.com/pingcap/tipocket/pkg/control" + "github.com/pingcap/tipocket/pkg/test-infra/fixture" +) + +var ( + dataNum = flag.Int("dataNum", 2000, "the number of data(the unit is 10 thoudstand)") + concurrency = flag.Int("concurrency", 400, "concurrency of worker") + batch = flag.Int("batch", 100, "batch of insert sql") +) + +func main() { + flag.Parse() + cfg := control.Config{ + Mode: control.ModeSelfScheduled, + ClientCount: 1, + RunTime: fixture.Context.RunTime, + RunRound: 1, + } + kvs := []string{"127.0.0.1:20160", "127.0.0.1:20162", "127.0.0.1:20161"} + suit := util.Suit{ + Config: &cfg, + //Provisioner: cluster.NewK8sProvisioner(), + Provisioner: cluster.NewLocalClusterProvisioner([]string{"127.0.0.1:4000"}, []string{"127.0.0.1:2379"}, kvs), + ClientCreator: writestress.ClientCreator{Cfg: &writestress.Config{ + DataNum: *dataNum, + Concurrency: *concurrency, + Batch: *batch, + }}, + NemesisGens: util.ParseNemesisGenerators(fixture.Context.Nemesis), + ClusterDefs: test_infra.NewDefaultCluster(fixture.Context.Namespace, fixture.Context.Namespace, + fixture.Context.TiDBClusterConfig), + } + suit.Run(context.Background()) +} diff --git a/tests/write-stress/write_stress.go b/tests/write-stress/write_stress.go new file mode 100644 index 00000000..68df51e2 --- /dev/null +++ b/tests/write-stress/write_stress.go @@ -0,0 +1,207 @@ +package writestress + +import ( + "context" + "database/sql" + "encoding/binary" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/juju/errors" + "github.com/ngaut/log" + + "github.com/pingcap/tipocket/pkg/cluster/types" + "github.com/pingcap/tipocket/pkg/core" + "github.com/pingcap/tipocket/util" +) + +// Table schema comes from the bank of pufa `tmp_jieb_instmnt_daily` +// CREATE TABLE `tmp_jieb_instmnt_daily` ( +// `ID` bigint(20) DEFAULT NULL COMMENT '主键ID', +// `TABLE_ID` int(11) NOT NULL COMMENT '分库ID', +// `FILE_DATE` char(8) NOT NULL COMMENT '文件日期', +// `CONTRACT_NO` varchar(128) NOT NULL COMMENT '借据号', +// `SETTLE_DATE` char(8) NOT NULL COMMENT '减免会计日期', +// `TERM_NO` int(11) NOT NULL COMMENT '期次号', +// +// `INPT_DATE` char(8) DEFAULT NULL COMMENT '录入日期', +// `INPT_TIME` varchar(20) DEFAULT NULL COMMENT '录入时间', +// `RCRD_ST_CODE` varchar(1) DEFAULT NULL COMMENT '记录状态代码', +// UNIQUE KEY `TMP_JIEB_INSTMNT_DAILY_IDX1` (`CONTRACT_NO`,`TERM_NO`), +// KEY `TMP_JIEB_INSTMNT_DAILY_IDX2` (`TABLE_ID`,`CONTRACT_NO`) +// ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=5 PRE_SPLIT_REGIONS=5 */ COMMENT='借呗日终(分期)信息临时表'; +const ( + stmtDrop = `DROP TABLE IF EXISTS write_stress` + stmtCreate = ` + CREATE TABLE write_stress ( + TABLE_ID int(11) NOT NULL COMMENT '分库ID', + CONTRACT_NO varchar(128) NOT NULL COMMENT '借据号', + TERM_NO int(11) NOT NULL COMMENT '期次号', + NOUSE char(60) NOT NULL COMMENT '填充位', + + UNIQUE KEY TMP_JIEB_INSTMNT_DAILY_IDX1 (CONTRACT_NO, TERM_NO), + KEY TMP_JIEB_INSTMNT_DAILY_IDX2 (TABLE_ID, CONTRACT_NO) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; +` +) + +// Config is for writestressClient +type Config struct { + DataNum int `toml:"dataNum"` + Concurrency int `toml:"concurrency"` + Batch int `toml:"batch"` +} + +// ClientCreator creates writestressClient +type ClientCreator struct { + Cfg *Config +} + +// Create ... +func (l ClientCreator) Create(node types.ClientNode) core.Client { + return &writestressClient{ + Config: l.Cfg, + } +} + +// ledgerClient simulates a complete record of financial transactions over the +// life of a bank (or other company). +type writestressClient struct { + *Config + db *sql.DB + contract_ids [][]byte +} + +func (c *writestressClient) SetUp(ctx context.Context, nodes []types.ClientNode, idx int) error { + if idx != 0 { + return nil + } + + var err error + node := nodes[idx] + dsn := fmt.Sprintf("root@tcp(%s:%d)/test", node.IP, node.Port) + + log.Infof("start to init...") + c.db, err = util.OpenDB(dsn, c.Concurrency) + if err != nil { + return err + } + defer func() { + log.Infof("init end...") + }() + + if _, err := c.db.Exec(stmtDrop); err != nil { + log.Fatalf("execute statement %s error %v", stmtDrop, err) + } + + if _, err := c.db.Exec(stmtCreate); err != nil { + log.Fatalf("execute statement %s error %v", stmtCreate, err) + } + + return nil +} + +func (c *writestressClient) TearDown(ctx context.Context, nodes []types.ClientNode, idx int) error { + return nil +} + +func (c *writestressClient) Invoke(ctx context.Context, node types.ClientNode, r interface{}) core.UnknownResponse { + panic("implement me") +} + +func (c *writestressClient) NextRequest() interface{} { + panic("implement me") +} + +func (c *writestressClient) DumpState(ctx context.Context) (interface{}, error) { + panic("implement me") +} + +func (c *writestressClient) Start(ctx context.Context, cfg interface{}, clientNodes []types.ClientNode) error { + log.Infof("start to test...") + defer func() { + log.Infof("test end...") + }() + c.contract_ids = make([][]byte, c.DataNum) + timeUnix := time.Now().Unix() + count := uint8(0) + b := make([]byte, 8) + for i := 0; i < c.DataNum; i++ { + // "abcd" + timestamp(8 bit) + count(8 bit) + c.contract_ids[i] = append(c.contract_ids[i], []byte("abcd")...) + binary.LittleEndian.PutUint64(b, uint64(timeUnix)) + c.contract_ids[i] = append(c.contract_ids[i], b...) + binary.LittleEndian.PutUint64(b, uint64(count)) + c.contract_ids[i] = append(c.contract_ids[i], b...) + + count++ + if count == 0 { + timeUnix++ + } + } + + var wg sync.WaitGroup + for i := 0; i < c.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + } + if err := c.ExecuteInsert(c.db, i); err != nil { + //log.Errorf("exec failed %v", err) + } + } + }(i) + } + + wg.Wait() + return nil +} + +// ExecuteInsert is run case +func (c *writestressClient) ExecuteInsert(db *sql.DB, pos int) error { + num := c.Config.DataNum * 10000 / c.Config.Concurrency + + tx, err := db.Begin() + if err != nil { + return errors.Trace(err) + } + defer tx.Rollback() + str := make([]byte, 50) + rnd := rand.New(rand.NewSource(time.Now().Unix())) + for i := 0; i < num/c.Config.Batch; i++ { + n := num*pos + i*c.Config.Batch + if n >= c.DataNum { + break + } + query := fmt.Sprintf(`INSERT INTO write_stress (TABLE_ID, CONTRACT_NO, TERM_NO, NOUSE) VALUES `) + for j := 0; j < c.Config.Batch; j++ { + n := num*pos + i*c.Config.Batch + j + if n >= c.DataNum { + break + } + contract_id := c.contract_ids[] + util.RandString(str, rnd) + if j != 0 { + query += "," + } + query += fmt.Sprintf(`(%v, %v, %v, %v)`, rnd.Uint32()%960+1, string(contract_id[:]), rnd.Uint32()%36+1, string(str[:])) + } + fmt.Println(query) + if _, err := tx.Exec(query); err != nil { + return errors.Trace(err) + } + } + + if err := tx.Commit(); err != nil { + return errors.Trace(err) + } + + return nil +}