-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstorage.go
109 lines (90 loc) · 2.78 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"fmt"
"github.com/go-orm/gorm"
_ "github.com/go-orm/gorm/dialects/sqlite"
dynamicstruct "github.com/ompluscator/dynamic-struct"
log "github.com/sirupsen/logrus"
"path"
)
type DBStorage struct {
*gorm.DB
Tables map[string]bool
}
func NewDBStorage(dBPath string) (*DBStorage, error) {
db, err := gorm.Open("sqlite3", path.Join(dBPath, "collections.db"))
if err != nil {
return nil, err
}
return &DBStorage{DB: db, Tables: make(map[string]bool)}, nil
}
func (db *DBStorage) CreateTable(tableName string, fields []MapValueField) {
log.Debugf("Creating table: %s on database", tableName)
if _, ok := db.Tables[tableName]; ok {
log.Debugf("Table %s already exists, skipping", tableName)
return
}
instance := dynamicstruct.ExtendStruct(gorm.Model{})
for _, field := range fields {
if field.Type == "float" {
instance.AddField(Capitalize(field.Name), 0.0, "")
} else if field.Type == "int" {
instance.AddField(Capitalize(field.Name), 0, "")
} else {
//defaults to string type
instance.AddField(Capitalize(field.Name), field.Type, "")
}
}
newInst := instance.Build().New()
table := db.Table(tableName)
if table.HasTable(newInst) {
table.AutoMigrate(newInst)
} else {
table.CreateTable(newInst)
}
db.Tables[tableName] = true
}
type InsertRecord struct {
TableName string
FieldNames []string
Values []string
}
func (db *DBStorage) CreateRecord(task *SchedulerTask, tableName string, fields []MapValueField, values []string) error {
var insertIntoDB = func(table string, fields []MapValueField, values []string) error {
var fieldNames, formattedValues []string
if table == "" || fields == nil || values == nil {
return fmt.Errorf("Skipping data insertion, nil values passed to insertIntoDb")
}
log.Debugf("creating new record entry on table: %s", table)
fieldNames = append(fieldNames, "created_at")
for _, field := range fields {
fieldNames = append(fieldNames, field.Name)
}
formattedValues = append(formattedValues, "datetime('now')")
for _, field := range fields {
formattedValues = append(formattedValues, field.Format(values))
}
if !task.Scheduler.Stopped {
*task.DBOpsQueue <- &InsertRecord{FieldNames: fieldNames, Values: formattedValues, TableName: table}
}
return nil
}
var isIndexOnValues = func(values []string) error {
for _, field := range fields {
if field.Index > len(values) {
return fmt.Errorf(
"Not found value that matches field: %s with idx: %d in returned values (length: %d)",
field.Name, field.Index, len(values))
}
}
return nil
}
RemoveEmptyFromSlice(&values)
if len(values) <= 0 {
return fmt.Errorf("Empty set of values returned, skipping")
}
if err := isIndexOnValues(values); err != nil {
return err
}
return insertIntoDB(tableName, fields, values)
}