Skip to content

Commit

Permalink
Slight revision to group operation
Browse files Browse the repository at this point in the history
  • Loading branch information
bsneed committed Nov 19, 2023
1 parent 4af5a8b commit 8a16082
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 18 deletions.
18 changes: 7 additions & 11 deletions Sources/Segment/Analytics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,37 +214,33 @@ extension Analytics {
apply { plugin in
// we want to enter as soon as possible. waiting to do it from
// another queue just takes too long.
flushGroup.enter()
operatingMode.run(queue: configuration.values.flushQueue) {
if let p = plugin as? FlushCompletion {
// flush(group:completion:) handles the enter/leave.
p.flush { plugin in
// flush handles the groups enter/leave calls
p.flush(group: flushGroup) { plugin in
// we don't really care about the plugin value .. yet.
flushGroup.leave()
}
} else if let p = plugin as? EventPlugin {
flushGroup.enter()
// we have no idea if this will be async or not, assume it's sync.
p.flush()
flushGroup.leave()
} else {
// this is for plugins that don't implement flush.
flushGroup.leave()
}
}
}

// if we're not in server mode, we need to be notified when it's done.
// if we're not in sync mode, we need to be notified when it's done.
if let completion, operatingMode != .synchronous {
// set up our callback to know when the group has completed, if we're not
// in .server operating mode.
// in .synchronous operating mode.
flushGroup.notify(queue: configuration.values.flushQueue) {
DispatchQueue.main.async { completion() }
completion() //DispatchQueue.main.async { completion() }
}
}

flushGroup.leave() // matches our initial enter().

// if we ARE in server mode, we need to wait on the group.
// if we ARE in sync mode, we need to wait on the group.
// This effectively ends up being a `sync` operation.
if operatingMode == .synchronous {
flushGroup.wait()
Expand Down
2 changes: 1 addition & 1 deletion Sources/Segment/Plugins.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public protocol VersionedPlugin {
}

public protocol FlushCompletion {
func flush(completion: @escaping (DestinationPlugin) -> Void)
func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void)
}

// For internal platform-specific bits
Expand Down
14 changes: 12 additions & 2 deletions Sources/Segment/Plugins/SegmentDestination.swift
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,19 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
// unused .. see flush(group:completion:)
}

public func flush(completion: @escaping (DestinationPlugin) -> Void) {
public func flush(group: DispatchGroup, completion: @escaping (DestinationPlugin) -> Void) {
guard let storage = self.storage else { return }
guard let analytics = self.analytics else { return }
guard let httpClient = self.httpClient else { return }

// don't flush if analytics is disabled.
guard analytics.enabled == true else { return }

// enter for the high level flush, allow us time to run through any existing files..
group.enter()

// Read events from file system
guard let data = storage.read(Storage.Constants.events) else { return }
guard let data = storage.read(Storage.Constants.events) else { group.leave(); return }

eventCount = 0
cleanupUploads()
Expand All @@ -134,6 +137,8 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion

if pendingUploads == 0 {
for url in data {
// enter for this url we're going to kick off
group.enter()
analytics.log(message: "Processing Batch:\n\(url.lastPathComponent)")
// set up the task
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, batch: url) { (result) in
Expand All @@ -152,6 +157,8 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
self.cleanupUploads()
// call the completion
completion(self)
// leave for the url we kicked off.
group.leave()
}
// we have a legit upload in progress now, so add it to our list.
if let upload = uploadTask {
Expand All @@ -161,6 +168,9 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
} else {
analytics.log(message: "Skipping processing; Uploads in progress.")
}

// leave for the high level flush
group.leave()
}
}

Expand Down
33 changes: 33 additions & 0 deletions Sources/Segment/Utilities/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,36 @@ extension Optional: Flattenable {
}
}

class TrackingDispatchGroup: CustomStringConvertible {
internal let group = DispatchGroup()

var description: String {
return "DispatchGroup Enters: \(enters), Leaves: \(leaves)"
}

var enters: Int = 0
var leaves: Int = 0
var current: Int = 0

func enter() {
enters += 1
current += 1
group.enter()
}

func leave() {
leaves += 1
current -= 1
group.leave()
}

init() { }

func wait() {
group.wait()
}

public func notify(qos: DispatchQoS = .unspecified, flags: DispatchWorkItemFlags = [], queue: DispatchQueue, execute work: @escaping @convention(block) () -> Void) {
group.notify(qos: qos, flags: flags, queue: queue, execute: work)
}
}
12 changes: 8 additions & 4 deletions Tests/Segment-Tests/StressTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class StressTests: XCTestCase {
let event = "write queue 1: \(eventsWritten)"
analytics.track(name: event)
eventsWritten += 1
usleep(0001)
//usleep(0001)
RunLoop.main.run(until: Date.distantPast)
}
print("queue 1 wrote \(eventsWritten) events.")
queue1Done = true
Expand All @@ -82,7 +83,8 @@ class StressTests: XCTestCase {
let event = "write queue 2: \(eventsWritten)"
analytics.track(name: event)
eventsWritten += 1
usleep(0001)
//usleep(0001)
RunLoop.main.run(until: Date.distantPast)
}
print("queue 2 wrote \(eventsWritten) events.")
queue2Done = true
Expand All @@ -91,10 +93,12 @@ class StressTests: XCTestCase {
flushQueue.async {
while (ready == false) { usleep(1) }
var counter = 0
sleep(1)
//sleep(1)
RunLoop.main.run(until: Date(timeIntervalSinceNow: 1))
while (queue1Done == false || queue2Done == false) {
let sleepTime = UInt32.random(in: 1..<3000)
usleep(sleepTime)
//usleep(sleepTime)
RunLoop.main.run(until: Date(timeIntervalSinceNow: Double(sleepTime / 1000) ))
analytics.flush()
counter += 1
}
Expand Down

0 comments on commit 8a16082

Please sign in to comment.