From e7d3c3ad3fc49dd6fdafc686241de1075b23987b Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:36:36 +0100 Subject: [PATCH 1/9] simplify connection string configuration --- cmd/timescaledb-parallel-copy/main.go | 5 +- internal/db/db.go | 81 +----------------- internal/db/db_internal_test.go | 118 -------------------------- pkg/csvcopy/csvcopy.go | 42 ++------- pkg/csvcopy/csvcopy_test.go | 4 +- pkg/csvcopy/options.go | 26 ++++++ 6 files changed, 38 insertions(+), 238 deletions(-) create mode 100644 pkg/csvcopy/options.go diff --git a/cmd/timescaledb-parallel-copy/main.go b/cmd/timescaledb-parallel-copy/main.go index 1088352..81cda6b 100644 --- a/cmd/timescaledb-parallel-copy/main.go +++ b/cmd/timescaledb-parallel-copy/main.go @@ -51,8 +51,10 @@ var ( // Parse args func init() { + // Documented https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING flag.StringVar(&postgresConnect, "connection", "host=localhost user=postgres sslmode=disable", "PostgreSQL connection url") - flag.StringVar(&dbName, "db-name", "", "Database where the destination table exists") + // Deprecated, use connection + // flag.StringVar(&dbName, "db-name", "", "Database where the destination table exists") flag.StringVar(&tableName, "table", "test_table", "Destination table for insertions") flag.StringVar(&schemaName, "schema", "public", "Destination table's schema") flag.BoolVar(&truncate, "truncate", false, "Truncate the destination table before insert") @@ -92,7 +94,6 @@ func main() { copier, err := csvcopy.NewCopier( postgresConnect, - dbName, schemaName, tableName, copyOptions, diff --git a/internal/db/db.go b/internal/db/db.go index 2604169..e5580a1 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -6,50 +6,11 @@ import ( "io" "os" "regexp" - "strconv" - "strings" - "github.com/jackc/pgconn" "github.com/jackc/pgx/v4/stdlib" "github.com/jmoiron/sqlx" ) -// minimalConnConfig is the minimal settings we need for connection. More -// unusual options are currently not supported. -type minimalConnConfig struct { - host string - user string - password string - db string - port uint16 - sslmode string -} - -// DSN returns the PostgreSQL compatible DSN string that corresponds to mcc. -// This is expressed as a string of = separated by spaces. -func (mcc *minimalConnConfig) DSN() string { - var s strings.Builder - writeNonempty := func(key, val string) { - if val != "" { - _, err := s.WriteString(key + "=" + val + " ") - if err != nil { - panic(err) - } - } - } - writeNonempty("host", mcc.host) - writeNonempty("user", mcc.user) - writeNonempty("password", mcc.password) - writeNonempty("dbname", mcc.db) - if mcc.port != 0 { - writeNonempty("port", strconv.FormatUint(uint64(mcc.port), 10)) - } - writeNonempty("sslmode", mcc.sslmode) - writeNonempty("application_name", "timescaledb-parallel-copy") - - return strings.TrimSpace(s.String()) -} - // Overrideable is an interface for defining ways to override PG settings // outside of the usual manners (through the connection string/URL or env vars). // An example would be having specific flags that can be used to set database @@ -66,40 +27,6 @@ func (o OverrideDBName) Override() string { return string(o) } -// parseConnStr uses an external lib (that backs pgx) to take care of parsing -// connection parameters for connecting to PostgreSQL. It handles the connStr -// being in DSN or URL form, as well as reading env vars for additional settings. -func parseConnStr(connStr string, overrides ...Overrideable) (*minimalConnConfig, error) { - config, err := pgconn.ParseConfig(connStr) - if err != nil { - return nil, err - } - sslmode, err := determineTLS(connStr) - if err != nil { - return nil, err - } - - mcc := &minimalConnConfig{ - host: config.Host, - user: config.User, - password: config.Password, - db: config.Database, - port: config.Port, - sslmode: sslmode, - } - - for _, o := range overrides { - switch o.(type) { - case OverrideDBName: - mcc.db = o.Override() - default: - return nil, fmt.Errorf("unknown overrideable: %T=%s", o, o.Override()) - } - } - - return mcc, nil -} - // ErrInvalidSSLMode is the error when the provided SSL mode is not one of the // values that PostgreSQL supports. type ErrInvalidSSLMode struct { @@ -141,12 +68,8 @@ func determineTLS(connStr string) (string, error) { // Connect returns a SQLX database corresponding to the provided connection // string/URL, env variables, and any provided overrides. -func Connect(connStr string, overrides ...Overrideable) (*sqlx.DB, error) { - mcc, err := parseConnStr(connStr, overrides...) - if err != nil { - return nil, fmt.Errorf("could not connect: %v", err) - } - db, err := sqlx.Connect("pgx", mcc.DSN()) +func Connect(connStr string) (*sqlx.DB, error) { + db, err := sqlx.Connect("pgx", connStr) if err != nil { return nil, fmt.Errorf("could not connect: %v", err) } diff --git a/internal/db/db_internal_test.go b/internal/db/db_internal_test.go index dd458bf..a47cda7 100644 --- a/internal/db/db_internal_test.go +++ b/internal/db/db_internal_test.go @@ -11,95 +11,6 @@ func (o badOverride) Override() string { return "fail" } -func TestParseConnStr(t *testing.T) { - cases := []struct { - desc string - input string - overrides []Overrideable - envs map[string]string - want *minimalConnConfig - wantErr string - }{ - { - desc: "simple DSN", - input: "host=localhost user=test", - want: &minimalConnConfig{ - host: "localhost", - user: "test", - port: 5432, - }, - }, - { - desc: "simple URL", - input: "postgres://test@localhost", - want: &minimalConnConfig{ - host: "localhost", - user: "test", - port: 5432, - }, - }, - { - desc: "full URL", - input: "postgres://test:secretPassword!!+!@example:7182/foo?sslmode=require", - want: &minimalConnConfig{ - host: "example", - user: "test", - password: "secretPassword!!+!", - port: 7182, - db: "foo", - sslmode: "require", - }, - }, - { - desc: "DSN w/ override", - input: "host=localhost user=test dbname=foo", - overrides: []Overrideable{OverrideDBName("bar")}, - want: &minimalConnConfig{ - host: "localhost", - user: "test", - port: 5432, - db: "bar", - }, - }, - { - desc: "unknown override", - input: "host=localhost", - overrides: []Overrideable{badOverride("fail")}, - wantErr: "unknown overrideable: db.badOverride=fail", - }, - } - - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - if c.envs != nil { - for k, v := range c.envs { - if err := os.Setenv(k, v); err != nil { - t.Errorf("could not set env %s=%s", k, v) - return - } - } - } - - mcc, err := parseConnStr(c.input, c.overrides...) - if c.wantErr == "" { - if err != nil { - t.Errorf("unexpected error parsing: %v", err) - } - if got := mcc.DSN(); got != c.want.DSN() { - t.Errorf("incorrect result:\ngot\n%s\nwant\n%s", got, c.want.DSN()) - } - } else { - if err == nil { - t.Errorf("unexpected lack of error") - } - if got := err.Error(); got != c.wantErr { - t.Errorf("incorrect error:\ngot\n%s\nwant\n%s", got, c.wantErr) - } - } - }) - } -} - func TestDetermineTLS(t *testing.T) { cases := []struct { desc string @@ -193,32 +104,3 @@ func TestDetermineTLS(t *testing.T) { }) } } - -func TestDSN(t *testing.T) { - cases := []struct { - desc string - input minimalConnConfig - want string - }{ - { - desc: "", - input: minimalConnConfig{ - host: "localhost", - port: 5432, - user: "user", - password: "password", - db: "db", - sslmode: "prefer", - }, - want: "host=localhost user=user password=password dbname=db port=5432 sslmode=prefer application_name=timescaledb-parallel-copy", - }, - } - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - got := c.input.DSN() - if c.want != got { - t.Errorf("wanted %s got %s", c.want, got) - } - }) - } -} diff --git a/pkg/csvcopy/csvcopy.go b/pkg/csvcopy/csvcopy.go index 798a840..28c3eff 100644 --- a/pkg/csvcopy/csvcopy.go +++ b/pkg/csvcopy/csvcopy.go @@ -20,30 +20,6 @@ import ( const TAB_CHAR_STR = "\\t" -type Logger interface { - Infof(msg string, args ...interface{}) -} - -type noopLogger struct{} - -func (l *noopLogger) Infof(msg string, args ...interface{}) {} - -type Option func(c *Copier) - -func WithLogger(logger Logger) Option { - return func(c *Copier) { - c.logger = logger - } -} - -// WithReportingFunction sets the function that will be called at -// reportingPeriod with information about the copy progress -func WithReportingFunction(f ReportFunc) Option { - return func(c *Copier) { - c.reportingFunction = f - } -} - type Result struct { RowsRead int64 Duration time.Duration @@ -53,8 +29,7 @@ type Result struct { var HeaderInCopyOptionsError = errors.New("'HEADER' in copyOptions") type Copier struct { - dbURL string - overrides []db.Overrideable + connString string schemaName string tableName string copyOptions string @@ -75,8 +50,7 @@ type Copier struct { } func NewCopier( - dbURL string, - dbName string, + connString string, schemaName string, tableName string, copyOptions string, @@ -94,11 +68,6 @@ func NewCopier( verbose bool, options ...Option, ) (*Copier, error) { - var overrides []db.Overrideable - if dbName != "" { - overrides = append(overrides, db.OverrideDBName(dbName)) - } - if strings.Contains(strings.ToUpper(copyOptions), "HEADER") { return nil, HeaderInCopyOptionsError } @@ -124,8 +93,7 @@ func NewCopier( } copier := &Copier{ - dbURL: dbURL, - overrides: overrides, + connString: connString, schemaName: schemaName, tableName: tableName, copyOptions: copyOptions, @@ -160,7 +128,7 @@ func NewCopier( } func (c *Copier) Truncate() (err error) { - dbx, err := db.Connect(c.dbURL, c.overrides...) + dbx, err := db.Connect(c.connString) if err != nil { return fmt.Errorf("failed to connect to the database: %w", err) } @@ -295,7 +263,7 @@ func (e *ErrAtRow) Unwrap() error { // processBatches reads batches from channel c and copies them to the target // server while tracking stats on the write. func (c *Copier) processBatches(ctx context.Context, ch chan batch.Batch) (err error) { - dbx, err := db.Connect(c.dbURL, c.overrides...) + dbx, err := db.Connect(c.connString) if err != nil { return err } diff --git a/pkg/csvcopy/csvcopy_test.go b/pkg/csvcopy/csvcopy_test.go index 4836385..3e7ebb1 100644 --- a/pkg/csvcopy/csvcopy_test.go +++ b/pkg/csvcopy/csvcopy_test.go @@ -67,7 +67,7 @@ func TestWriteDataToCSV(t *testing.T) { writer.Flush() - copier, err := NewCopier(connStr, "test-db", "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 0, false) + copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 0, false) require.NoError(t, err) reader, err := os.Open(tmpfile.Name()) @@ -154,7 +154,7 @@ func TestErrorAtRow(t *testing.T) { writer.Flush() - copier, err := NewCopier(connStr, "test-db", "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 2, true, 0, false) + copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 2, true, 0, false) require.NoError(t, err) reader, err := os.Open(tmpfile.Name()) require.NoError(t, err) diff --git a/pkg/csvcopy/options.go b/pkg/csvcopy/options.go new file mode 100644 index 0000000..779dfbc --- /dev/null +++ b/pkg/csvcopy/options.go @@ -0,0 +1,26 @@ +package csvcopy + +type Option func(c *Copier) + +type Logger interface { + Infof(msg string, args ...interface{}) +} + +type noopLogger struct{} + +func (l *noopLogger) Infof(msg string, args ...interface{}) {} + +// WithLogger sets the logger where the application will print debug messages +func WithLogger(logger Logger) Option { + return func(c *Copier) { + c.logger = logger + } +} + +// WithReportingFunction sets the function that will be called at +// reportingPeriod with information about the copy progress +func WithReportingFunction(f ReportFunc) Option { + return func(c *Copier) { + c.reportingFunction = f + } +} From 1359d0ccbd29bed1fd758133a228c13a7a01c6cc Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:38:30 +0100 Subject: [PATCH 2/9] remove unised tests --- internal/db/db_internal_test.go | 106 -------------------------------- 1 file changed, 106 deletions(-) delete mode 100644 internal/db/db_internal_test.go diff --git a/internal/db/db_internal_test.go b/internal/db/db_internal_test.go deleted file mode 100644 index a47cda7..0000000 --- a/internal/db/db_internal_test.go +++ /dev/null @@ -1,106 +0,0 @@ -package db - -import ( - "os" - "testing" -) - -type badOverride string - -func (o badOverride) Override() string { - return "fail" -} - -func TestDetermineTLS(t *testing.T) { - cases := []struct { - desc string - input string - envs map[string]string - want string - wantErr error - }{ - { - desc: "DSN valid (require)", - input: "host=localhost sslmode=require", - want: "require", - }, - { - desc: "URL valid (disable)", - input: "postgres://localhost?sslmode=disable", - want: "disable", - }, - { - desc: "URL valid (allow)", - input: "postgresql://localhost?sslmode=allow", - want: "allow", - }, - { - desc: "DSN valid (prefer)", - input: "host=localhost sslmode=prefer", - want: "prefer", - }, - { - desc: "DSN valid (verify-ca)", - input: "host=localhost sslmode=verify-ca port=1234", - want: "verify-ca", - }, - { - desc: "DSN valid (verify-full)", - input: "sslmode=verify-full", - want: "verify-full", - }, - { - desc: "DSN invalid", - input: "sslmode=preferred", - wantErr: &ErrInvalidSSLMode{given: "preferred"}, - }, - { - desc: "missing, no env", - input: "host=localhost", - want: "", - }, - { - desc: "missing, valid env", - input: "host=localhost", - envs: map[string]string{envSSLMode: "prefer"}, - want: "prefer", - }, - { - desc: "missing, invalid env", - envs: map[string]string{envSSLMode: "who"}, - wantErr: &ErrInvalidSSLMode{given: "who"}, - }, - { - desc: "missing, no env", - want: "", - }, - } - - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - os.Unsetenv(envSSLMode) - if c.envs != nil { - for k, v := range c.envs { - if err := os.Setenv(k, v); err != nil { - t.Errorf("could not set env %s with value %s", k, v) - return - } - } - } - got, err := determineTLS(c.input) - if c.wantErr == nil { - if err != nil { - t.Errorf("unexpected error: %v", err) - } else if got != c.want { - t.Errorf("incorrect ssl mode: got %s want %s", got, c.want) - } - } else { - if err == nil { - t.Errorf("unexpected lack of error") - } else if err.Error() != c.wantErr.Error() { - t.Errorf("incorrect error:\ngot\n%v\nwant\n%v", got, c.wantErr) - } - } - }) - } -} From ea1f05fe075e5598ec9e3632b4c9e49fc3da5d9c Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:39:14 +0100 Subject: [PATCH 3/9] cleanup --- internal/db/db.go | 57 ----------------------------------------------- 1 file changed, 57 deletions(-) diff --git a/internal/db/db.go b/internal/db/db.go index e5580a1..210420e 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -4,68 +4,11 @@ import ( "context" "fmt" "io" - "os" - "regexp" "github.com/jackc/pgx/v4/stdlib" "github.com/jmoiron/sqlx" ) -// Overrideable is an interface for defining ways to override PG settings -// outside of the usual manners (through the connection string/URL or env vars). -// An example would be having specific flags that can be used to set database -// connect parameters. -type Overrideable interface { - Override() string -} - -// OverrideDBName is a type for overriding the database name used to connect. -// To use it, one casts a string of the database name as an OverrideDBName -type OverrideDBName string - -func (o OverrideDBName) Override() string { - return string(o) -} - -// ErrInvalidSSLMode is the error when the provided SSL mode is not one of the -// values that PostgreSQL supports. -type ErrInvalidSSLMode struct { - given string -} - -func (e *ErrInvalidSSLMode) Error() string { - return "invalid SSL mode: " + e.given -} - -const ( - // envSSLMode is the environment variable key for SSL mode. - envSSLMode = "PGSSLMODE" -) - -var sslmodeRegex = regexp.MustCompile("sslmode=([a-zA-Z-]+)") - -// determineTLS attempts to match SSL mode to a known PostgreSQL supported value. -func determineTLS(connStr string) (string, error) { - res := sslmodeRegex.FindStringSubmatch(connStr) - var sslmode string - if len(res) == 2 { - sslmode = res[1] - } else { - sslmode = os.Getenv(envSSLMode) - } - - if sslmode == "" { - return "", nil - } - - switch sslmode { - case "require", "disable", "allow", "prefer", "verify-ca", "verify-full": - return sslmode, nil - default: - return "", &ErrInvalidSSLMode{given: sslmode} - } -} - // Connect returns a SQLX database corresponding to the provided connection // string/URL, env variables, and any provided overrides. func Connect(connStr string) (*sqlx.DB, error) { From 4d32354f908db97b12dc83383e1aa4f13cc07793 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:42:13 +0100 Subject: [PATCH 4/9] tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4708ade..c2a0336 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.30.0 github.com/testcontainers/testcontainers-go/modules/postgres v0.30.0 + golang.org/x/text v0.18.0 ) require ( @@ -71,7 +72,6 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/grpc v1.58.3 // indirect From 00d6ee62f3c1a853ea6c74512fabc35d84914f32 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:43:59 +0100 Subject: [PATCH 5/9] test with vendor --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c178b3d..3e233ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,8 +20,8 @@ jobs: - name: Check out code uses: actions/checkout@v2 - - name: Install dependencies - run: go mod download + # - name: Install dependencies + # run: go mod download - name: golangci-lint uses: golangci/golangci-lint-action@v4 From 82eb7ccdeca664aef06a49e57c1a992e71facf4c Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:45:00 +0100 Subject: [PATCH 6/9] fix linter warning --- cmd/timescaledb-parallel-copy/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/timescaledb-parallel-copy/main.go b/cmd/timescaledb-parallel-copy/main.go index 81cda6b..1c8e6be 100644 --- a/cmd/timescaledb-parallel-copy/main.go +++ b/cmd/timescaledb-parallel-copy/main.go @@ -45,8 +45,6 @@ var ( reportingPeriod time.Duration verbose bool showVersion bool - - dbName string ) // Parse args From 23f45a4af77bb1534b214c7f9c059c95399f1d72 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Mon, 2 Dec 2024 17:47:57 +0100 Subject: [PATCH 7/9] run go mod download again --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3e233ed..c178b3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,8 +20,8 @@ jobs: - name: Check out code uses: actions/checkout@v2 - # - name: Install dependencies - # run: go mod download + - name: Install dependencies + run: go mod download - name: golangci-lint uses: golangci/golangci-lint-action@v4 From 0fb4cd1b1a720e46e4c558e5be531d15c1e31a31 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:13:41 +0100 Subject: [PATCH 8/9] fail properly --- cmd/timescaledb-parallel-copy/main.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/timescaledb-parallel-copy/main.go b/cmd/timescaledb-parallel-copy/main.go index 1c8e6be..9b0f5ca 100644 --- a/cmd/timescaledb-parallel-copy/main.go +++ b/cmd/timescaledb-parallel-copy/main.go @@ -45,14 +45,15 @@ var ( reportingPeriod time.Duration verbose bool showVersion bool + + dbName string ) // Parse args func init() { // Documented https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING flag.StringVar(&postgresConnect, "connection", "host=localhost user=postgres sslmode=disable", "PostgreSQL connection url") - // Deprecated, use connection - // flag.StringVar(&dbName, "db-name", "", "Database where the destination table exists") + flag.StringVar(&dbName, "db-name", "", "(deprecated) Database where the destination table exists") flag.StringVar(&tableName, "table", "test_table", "Destination table for insertions") flag.StringVar(&schemaName, "schema", "public", "Destination table's schema") flag.BoolVar(&truncate, "truncate", false, "Truncate the destination table before insert") @@ -86,10 +87,14 @@ func (l csvCopierLogger) Infof(msg string, args ...interface{}) { func main() { if showVersion { - fmt.Printf("%s %s (%s %s)\n", binName, version, runtime.GOOS, runtime.GOARCH) + log.Printf("%s %s (%s %s)\n", binName, version, runtime.GOOS, runtime.GOARCH) os.Exit(0) } + if dbName != "" { + log.Fatalf("Error: Deprecated flag -db-name is being used. Update -connection to connect to the given database") + } + copier, err := csvcopy.NewCopier( postgresConnect, schemaName, From c36703ead7777c9ca45e042096fc613731bb2a0e Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Tue, 3 Dec 2024 13:23:51 +0100 Subject: [PATCH 9/9] wops --- pkg/csvcopy/csvcopy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/csvcopy/csvcopy_test.go b/pkg/csvcopy/csvcopy_test.go index d40c930..972c000 100644 --- a/pkg/csvcopy/csvcopy_test.go +++ b/pkg/csvcopy/csvcopy_test.go @@ -223,7 +223,7 @@ func TestWriteReportProgress(t *testing.T) { require.LessOrEqual(t, r.RowCount, int64(2)) } - copier, err := NewCopier(connStr, "test-db", "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 100*time.Millisecond, false, WithReportingFunction(reportF)) + copier, err := NewCopier(connStr, "public", "metrics", "CSV", ",", "", "", "device_id,label,value", false, 1, 1, 0, 5000, true, 100*time.Millisecond, false, WithReportingFunction(reportF)) require.NoError(t, err) reader, err := os.Open(tmpfile.Name())