-
Notifications
You must be signed in to change notification settings - Fork 2
/
segment.go
121 lines (104 loc) · 2.76 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package nimbusdb
import (
"os"
"path/filepath"
)
func (db *Db) getSegmentBlock(path string, blockNumber int64) (*BlockOffsetPair, bool) {
segment, ok := db.segments[path]
if !ok {
return nil, ok
}
block, ok := segment.blocks[blockNumber]
if !ok {
return nil, ok
}
return block, true
}
func (db *Db) setSegment(path string, segment *Segment) {
db.segments[path] = segment
}
func (db *Db) getSegment(path string) (*Segment, bool) {
segment, ok := db.segments[path]
return segment, ok
}
func (seg *Segment) getBlockOffset() int64 {
return seg.currentBlockOffset
}
func (seg *Segment) getBlockNumber() int64 {
return seg.currentBlockNumber
}
func (seg *Segment) getWriter() *os.File {
return seg.writer
}
func (seg *Segment) getPath() string {
return seg.path
}
func (seg *Segment) setPath(path string) {
seg.path = path
}
func (seg *Segment) setWriter(fp *os.File) {
seg.writer = fp
}
func (seg *Segment) closeWriter() error {
if !seg.closed {
err := seg.writer.Close()
if err != nil {
return err
}
}
seg.closed = true
return nil
}
func (seg *Segment) setBlockNumber(blockNumber int64) {
seg.currentBlockNumber = blockNumber
}
func (seg *Segment) setBlockOffset(blockOffset int64) {
seg.currentBlockOffset = blockOffset
}
func (seg *Segment) setBlock(blockNumber int64, block *BlockOffsetPair) {
seg.blocks[blockNumber] = block
}
func (db *Db) getSegmentFilePointerFromPath(keyDirPath string) (*os.File, error) {
path := filepath.Join(db.dirPath, keyDirPath)
f, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
return f, nil
}
func (db *Db) updateSegment(kdValue *KeyDirValue, segment *Segment) (*Segment, error) {
segmentBlock, ok := db.getSegmentBlock(kdValue.path, segment.currentBlockNumber)
if !ok {
return nil, ERROR_CANNOT_READ_FILE
}
segmentBlock.endOffset = kdValue.offset + kdValue.size
segment.setBlock(segment.currentBlockNumber, segmentBlock)
if segment.currentBlockOffset+kdValue.size <= BlockSize {
kdValue.blockNumber = segment.currentBlockNumber
segment.setBlockOffset(segment.getBlockOffset() + kdValue.size)
} else {
segment.currentBlockNumber += 1
segment.blocks[segment.currentBlockNumber] = &BlockOffsetPair{
startOffset: kdValue.offset,
endOffset: kdValue.offset + kdValue.size,
filePath: kdValue.path,
}
kdValue.blockNumber = segment.currentBlockNumber
segment.setBlockOffset(kdValue.size)
}
db.setSegment(kdValue.path, segment)
return segment, nil
}
func createNewSegment(kdValue *KeyDirValue) *Segment {
return &Segment{
blocks: map[int64]*BlockOffsetPair{
0: {startOffset: kdValue.offset,
endOffset: kdValue.offset + kdValue.size,
filePath: kdValue.path,
},
},
path: kdValue.path,
currentBlockNumber: 0,
currentBlockOffset: 0,
}
}