-
Notifications
You must be signed in to change notification settings - Fork 1
92 lines (76 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class KafkaAT07 < Formula
desc "Publish-subscribe messaging rethought as a distributed commit log"
homepage "https://kafka.apache.org/"
url "https://dl.bintray.com/weblinc/mirrors/kafka-0.7.2.tar.gz"
sha256 "6f6c9f6de136feaf49dab892fd54afce33939653470990c8073d7e3db1a0cb38"
depends_on "zookeeper"
depends_on :java => "1.8"
def pour_bottle?; false; end
def install
data = var/"lib"
inreplace "config/server.properties",
"log.dir=/tmp/kafka-logs", "log.dir=#{data}/kafka-logs"
libexec.install "libs"
prefix.install "bin"
bin.env_script_all_files(libexec/"bin", Language::Java.java_home_env("1.8"))
Dir["#{bin}/*.sh"].each { |f| mv f, f.to_s.gsub(/.sh$/, "") }
mv "config", "[email protected]"
etc.install "[email protected]"
libexec.install_symlink etc/"kafka" => "config"
# create directory for kafka stdout+stderr output logs when run by launchd
(var+"log/kafka").mkpath
end
def plist; <<~EOS
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>#{plist_name}</string>
<key>WorkingDirectory</key>
<string>#{HOMEBREW_PREFIX}</string>
<key>ProgramArguments</key>
<array>
<string>#{opt_bin}/kafka-server-start</string>
<string>#{etc}/[email protected]/server.properties</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardErrorPath</key>
<string>#{var}/log/kafka/kafka_output.log</string>
<key>StandardOutPath</key>
<string>#{var}/log/kafka/kafka_output.log</string>
</dict>
</plist>
EOS
end
test do
ENV["LOG_DIR"] = "#{testpath}/kafkalog"
(testpath/"kafka").mkpath
cp "#{etc}/kafka/zookeeper.properties", testpath/"kafka"
cp "#{etc}/kafka/server.properties", testpath/"kafka"
inreplace "#{testpath}/kafka/zookeeper.properties", "#{var}/lib", testpath
inreplace "#{testpath}/kafka/server.properties", "#{var}/lib", testpath
begin
fork do
exec "#{bin}/zookeeper-server-start #{testpath}/kafka/zookeeper.properties > #{testpath}/test.zookeeper-server-start.log 2>&1"
end
sleep 15
fork do
exec "#{bin}/kafka-server-start #{testpath}/kafka/server.properties > #{testpath}/test.kafka-server-start.log 2>&1"
end
sleep 30
system "#{bin}/kafka-topics --zookeeper localhost:2181 --create --if-not-exists --replication-factor 1 --partitions 1 --topic test > #{testpath}/kafka/demo.out 2>/dev/null"
pipe_output("#{bin}/kafka-console-producer --broker-list localhost:9092 --topic test 2>/dev/null", "test message")
system "#{bin}/kafka-console-consumer --zookeeper localhost:2181 --topic test --from-beginning --max-messages 1 >> #{testpath}/kafka/demo.out 2>/dev/null"
system "#{bin}/kafka-topics --zookeeper localhost:2181 --delete --topic test >> #{testpath}/kafka/demo.out 2>/dev/null"
ensure
system "#{bin}/kafka-server-stop"
system "#{bin}/zookeeper-server-stop"
sleep 10
end
assert_match(/test message/, IO.read("#{testpath}/kafka/demo.out"))
end
end