Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sandbox gfr new jitter #1033

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
eb6c0b2
use datatype for active_tasks metric
Nov 25, 2021
fefd677
Add fast Ethernet and IP header extraction
Nov 23, 2021
39be57f
42% coverage
Nov 24, 2021
950d128
better slice allocation
Nov 24, 2021
f3d3530
use pointer for CI
Nov 25, 2021
11ffdd2
remove redundant wrap, make single copy of CaptureInfo
Nov 25, 2021
00699f5
small safety improvements
Nov 27, 2021
a8fc9ab
parse options, still copy packets
Nov 27, 2021
2645389
bypass option decoding
Nov 27, 2021
3870f80
use tcpip package
Nov 25, 2021
e1233cb
use copying read, but summarize instead of saving
Nov 27, 2021
6d271d2
copy the options to an allocated object
Nov 27, 2021
c37e17c
move some code to test, general cleanup
Nov 28, 2021
da4b74c
hacking jitter
Nov 28, 2021
96afa01
patching in tcp code
Nov 28, 2021
d5e4c0e
successfully patched tcp parsing into Summary.Add
Nov 28, 2021
0d41e89
check a few tcp stats
Nov 28, 2021
54a8848
suppress schema updates in conflict with other development
Nov 20, 2021
f130221
test for sacks and fix stats code
Nov 28, 2021
e5d7a24
many tcp decoding improvements
Nov 29, 2021
37f6c62
use explicit byte reverse decoding for timestamps, sacks, and remove …
Nov 29, 2021
12463ff
use BE16 and BE32
Dec 1, 2021
386ee14
better sack decoding
Dec 1, 2021
3a4df3b
hacking new option handling
Dec 1, 2021
ddc63dc
use tcpOption.GetMSS, etc
Dec 1, 2021
c076a86
checkpoint
Dec 1, 2021
bba96e7
hacking with escape and slices
Dec 1, 2021
4374c53
fix redundant option handling
Dec 1, 2021
5357e6c
refactor ParseOptions - slightly faster
Dec 1, 2021
eeefbac
escape opt and different options method
Dec 1, 2021
e107c06
add second benchmark
Dec 1, 2021
f75d06b
adjust some expected test values
Dec 1, 2021
c4f9d8f
fix missing ports that were interfering with option processing
Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion active/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ type GCSSource struct {

pendingChan chan Runnable

label string
label string
datatype string
}

// NewGCSSource creates a new source for active processing.
Expand All @@ -113,6 +114,7 @@ func NewGCSSource(ctx context.Context, job tracker.Job, fl FileLister, toRunnabl

pendingChan: make(chan Runnable, 0),
label: job.Path(),
datatype: job.Datatype,
}

go src.streamToPending(ctx, job)
Expand All @@ -125,6 +127,10 @@ func (src *GCSSource) Label() string {
return src.label
}

func (src *GCSSource) Datatype() string {
return src.datatype
}

// CancelStreaming terminates the streaming goroutine context.
func (src *GCSSource) CancelStreaming() {
src.ctx.cancel()
Expand Down
4 changes: 2 additions & 2 deletions active/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job track
debug.Println("Starting func")

f := func() (err error) {
metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Inc()
defer metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Dec()
metrics.ActiveTasks.WithLabelValues(rSrc.Datatype()).Inc()
defer metrics.ActiveTasks.WithLabelValues(rSrc.Datatype()).Dec()

// Capture any panic and convert it to an error.
defer func(tag string) {
Expand Down
3 changes: 3 additions & 0 deletions active/runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ type RunnableSource interface {

// Label returns a string for use in metrics and debug logs'
Label() string

// Datatype returns the datatype for use in metrics
Datatype() string
}
3 changes: 3 additions & 0 deletions active/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (s *source) Next(ctx context.Context) (active.Runnable, error) {
func (s *source) Label() string {
return "label"
}
func (s *source) Datatype() string {
return "datatype"
}

func (sr *statsRunnable) Run(ctx context.Context) error {
now := sr.stats.add()
Expand Down
3 changes: 3 additions & 0 deletions cmd/update-schema/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

// CreateOrUpdateTCPInfo will update existing TCPInfo table, or create new table if update fails.
func CreateOrUpdateTCPInfo(project string, dataset string, table string) error {
return nil // HACK to avoid collision with other sandbox work.
row := schema.TCPRow{}
schema, err := row.Schema()
rtx.Must(err, "TCPRow.Schema")
Expand All @@ -40,6 +41,7 @@ func CreateOrUpdateTCPInfo(project string, dataset string, table string) error {
}

func CreateOrUpdatePT(project string, dataset string, table string) error {
return nil // HACK to avoid collision with other sandbox work.
row := schema.PTTest{}
schema, err := row.Schema()
rtx.Must(err, "PTTest.Schema")
Expand Down Expand Up @@ -108,6 +110,7 @@ func CreateOrUpdateSwitchStats(project string, dataset string, table string) err
}

func CreateOrUpdatePCAPRow(project string, dataset string, table string) error {
return nil // HACK to avoid collision with other sandbox work.
row := schema.PCAPRow{}
schema, err := row.Schema()
rtx.Must(err, "PCAPRow.Schema")
Expand Down
88 changes: 2 additions & 86 deletions parser/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,19 @@ package parser
import (
"fmt"
"log"
"net"
"os"
"path/filepath"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
v2as "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
"github.com/m-lab/etl/schema"
"github.com/m-lab/etl/tcpip"
"github.com/m-lab/go/logx"
)

Expand All @@ -30,87 +27,6 @@ var (
ErrNoIPLayer = fmt.Errorf("no IP layer")
)

// Packet struct contains the packet data and metadata.
type Packet struct {
// If we use a pointer here, for some reason we get zero value timestamps.
Ci gopacket.CaptureInfo
Data []byte
Err error
}

// GetIP decodes the IP layers and returns some basic information.
// It is a bit slow and does memory allocation.
func (p *Packet) GetIP() (net.IP, net.IP, uint8, uint16, error) {
// Decode a packet.
pkt := gopacket.NewPacket(p.Data, layers.LayerTypeEthernet, gopacket.DecodeOptions{
Lazy: true,
NoCopy: true,
SkipDecodeRecovery: true,
DecodeStreamsAsDatagrams: false,
})

if ipLayer := pkt.Layer(layers.LayerTypeIPv4); ipLayer != nil {
ip, _ := ipLayer.(*layers.IPv4)
// For IPv4, the TTL length is the ip.Length adjusted for the header length.
return ip.SrcIP, ip.DstIP, ip.TTL, ip.Length - uint16(4*ip.IHL), nil
} else if ipLayer := pkt.Layer(layers.LayerTypeIPv6); ipLayer != nil {
ip, _ := ipLayer.(*layers.IPv6)
// In IPv6, the Length field is the payload length.
return ip.SrcIP, ip.DstIP, ip.HopLimit, ip.Length, nil
} else {
return nil, nil, 0, 0, ErrNoIPLayer
}
}

func GetPackets(data []byte) ([]Packet, error) {
pcap, err := pcapgo.NewReader(strings.NewReader(string(data)))
if err != nil {
log.Print(err)
return nil, err
}

// TODO: len(data)/18 provides much better estimate of number of packets.
// len(data)/18 was determined by looking at bytes/packet in a few pcaps files.
// The number seems too small, but perhaps the data is still compressed at this point.
// However, it seems to cause mysterious crashes in sandbox, so
// reverting to /1500 for now.
packets := make([]Packet, 0, len(data)/1500)

for data, ci, err := pcap.ZeroCopyReadPacketData(); err == nil; data, ci, err = pcap.ReadPacketData() {
packets = append(packets, Packet{Ci: ci, Data: data, Err: err})
}

if err != nil {
metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc()
metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets)))
return packets, err
} else if len(packets) > 0 {
srcIP, _, _, _, err := packets[0].GetIP()
// TODO - eventually we should identify key local ports, like 443 and 3001.
if err != nil {
metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc()
metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets)))
} else {
start := packets[0].Ci.Timestamp
end := packets[len(packets)-1].Ci.Timestamp
duration := end.Sub(start)
// TODO add TCP layer, so we can label the stats based on local port value.
if len(srcIP) == 4 {
metrics.PcapPacketCount.WithLabelValues("ipv4").Observe(float64(len(packets)))
metrics.PcapConnectionDuration.WithLabelValues("ipv4").Observe(duration.Seconds())
} else {
metrics.PcapPacketCount.WithLabelValues("ipv6").Observe(float64(len(packets)))
metrics.PcapConnectionDuration.WithLabelValues("ipv6").Observe(duration.Seconds())
}
}
} else {
// No packets.
metrics.PcapPacketCount.WithLabelValues("unknown").Observe(float64(len(packets)))
}

return packets, nil
}

//=====================================================================================
// PCAP Parser
//=====================================================================================
Expand Down Expand Up @@ -172,7 +88,7 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test

// Parse top level PCAP data and update metrics.
// TODO - add schema fields here.
_, _ = GetPackets(rawContent)
_, _ = tcpip.ProcessPackets(row.Parser.ArchiveURL, testName, rawContent)

// Insert the row.
if err := p.Put(&row); err != nil {
Expand Down
111 changes: 0 additions & 111 deletions parser/pcap_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package parser_test

import (
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
Expand Down Expand Up @@ -134,111 +131,3 @@ func TestPCAPParser_GetUUID(t *testing.T) {
})
}
}

func TestIPLayer(t *testing.T) {
type test struct {
name string
fn string
packets int64
duration time.Duration
srcIP, dstIP string
TTL uint8
}
tests := []test{
{name: "retransmits", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz",
packets: 336, duration: 15409174000, srcIP: "173.49.19.128"},
{name: "ipv6", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz",
packets: 15, duration: 134434000, srcIP: "2a0d:5600:24:a71::1d"},
{name: "protocolErrors2", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz",
packets: 5180, duration: 13444117000, srcIP: "2a0d:5600:24:a71::1d"},
}
for _, tt := range tests {
f, err := os.Open(tt.fn)
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(f)
if err != nil {
t.Fatal(err)
}
packets, err := parser.GetPackets(data)
if err != nil {
t.Fatal(err)
}
start := packets[0].Ci.Timestamp
end := packets[len(packets)-1].Ci.Timestamp
duration := end.Sub(start)
if duration != tt.duration {
t.Errorf("%s: duration = %v, want %v", tt.name, duration, tt.duration)
}
if len(packets) != int(tt.packets) {
t.Errorf("%s: expected %d packets, got %d", tt.name, tt.packets, len(packets))
}
srcIP, _, _, _, err := packets[0].GetIP()
if err != nil {
t.Fatal(err)
}
if srcIP.String() != tt.srcIP {
t.Errorf("%s: expected srcIP %s, got %s", tt.name, tt.srcIP, srcIP.String())
}
}
}

func TestPCAPGarbage(t *testing.T) {
data := []byte{0xd4, 0xc3, 0xb2, 0xa1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
_, err := parser.GetPackets(data)
if err != io.ErrUnexpectedEOF {
t.Fatal(err)
}

data = append(data, data...)
_, err = parser.GetPackets(data)
if err == nil || !strings.Contains(err.Error(), "Unknown major") {
t.Fatal(err)
}
}

func getTestFile(b *testing.B, name string) []byte {
f, err := os.Open(path.Join(`testdata/PCAP/`, name))
if err != nil {
b.Fatal(err)
}
data, err := ioutil.ReadAll(f)
if err != nil {
b.Fatal(err)
}
return data
}

// Original single file RunParallel:
// Just packet decoding: BenchmarkGetPackets-8 8678 128426 ns/op 165146 B/op 381 allocs/op
// With IP decoding: BenchmarkGetPackets-8 4279 285547 ns/op 376125 B/op 1729 allocs/op

// Enhanced RunParallel: BenchmarkGetPackets-8 2311 514898 ns/op 1181138 B/op 1886 allocs/op
func BenchmarkGetPackets(b *testing.B) {
type tt struct {
data []byte
numPkts int
}
tests := []tt{
{getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz"), 336},
{getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz"), 15},
{getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz"), 5180},
}
b.ResetTimer()

i := 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
test := tests[i%len(tests)]
i++
pkts, err := parser.GetPackets(test.data)
if err != nil {
b.Fatal(err)
}
if len(pkts) != test.numPkts {
b.Errorf("expected %d packets, got %d", test.numPkts, len(pkts))
}
}
})
}
51 changes: 48 additions & 3 deletions schema/pcap.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,62 @@
package schema

import (
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/m-lab/etl/row"
"github.com/m-lab/go/cloud/bqx"
)

type TcpStats struct {
Packets int64
Truncated int64

OptionCounts []int64 // 16 counts, indicating how often each option type occurred.

RetransmitPackets int64
RetransmitBytes int64

Sacks int64

ECECount int64
WindowChanges int64

// Errors and anomalies
BadSacks int64 // Number of sacks with bad boundaries
BadDeltas int64 // Number of seqs and acks that were more than 1<<30 off from previous value.
MissingPackets int64 // Observations of packet sequence numbers that didn't match previous payload length.
SendNextExceededLimit int64 // Number of times SendNext() returned a value that exceeded the receiver window limit.
TTLChanges int64 // Observed number of TTL values that don't match first IP header.
SrcPortErrors int64 // Observed number of source ports that don't match first IP header.
DstPortErrors int64 // Observed number of dest ports that don't match tcp.DstPort
OtherErrors int64 // Number of other errors that occurred.

}

type AlphaFields struct {
TruncatedPackets int64 `bigquery:"truncated_packets"`
SynPacket int64 `bigquery:"syn_packet" json:"syn_packet"`
SynTime time.Time `bigquery:"syn_time" json:"syn_time"`
SynAckPacket int64 `bigquery:"syn_ack_packet" json:"syn_ack_packet"`
SynAckTime time.Time `bigquery:"syn_ack_time" json:"syn_ack_time"`
Packets int64 `bigquery:"packets" json:"packets"`
Sacks int64 `bigquery:"sacks" json:"sacks"`
IPAddrErrors int64 `bigquery:"ip_addr_errors" json:"ip_addr_errors"` // Number of packets with IP addresses that don't match first IP header at all.
WithoutTCPLayer int64 `bigquery:"no_tcp_layer" json:"no_tcp_layer"` // Number of packets with no TCP layer.

LeftStats TcpStats
RightStats TcpStats
}

// PCAPRow describes a single BQ row of pcap (packet capture) data.
type PCAPRow struct {
ID string `bigquery:"id"`
Parser ParseInfo `bigquery:"parser"`
Date civil.Date `bigquery:"date"`
ID string `bigquery:"id" json:"id"`
Parser ParseInfo `bigquery:"parser" json:"parser"`
Date civil.Date `bigquery:"date" json:"date"`

// Alpha *AlphaFields `bigquery:"alpha" json:"alpha"`

// NOT part of struct schema. Included only to provide a fake annotator interface.
row.NullAnnotator `bigquery:"-"`
Expand Down
Binary file added tcp/optionDecode.cpu
Binary file not shown.
Loading