Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Self produce logs #72

Merged
merged 34 commits into from
Jan 10, 2025
Merged

Conversation

tmcqueen-materials
Copy link
Contributor

@Xarthisius @davidelbert : This is a WIP set of modifications to OpenMSIStream to enable self-production of own logs. I am submitting it now as a draft to get feedback on the overall approach, and to start getting all the CI style feedback, etc.

The approach taken here consists of two parts:

(1) In the first part, plumbing for separate production of log messages to kafka is added. This is essentially copy&paste of heartbeats, but simpler because individual class overrides are not required. I decided to keep it separate, instead of a single combined heartbeat/logs production, since I can see cases where you would want one and not the other, or vice versa.

(2) In the second part, a single global, thread-safe, implementation of a loggings and warnings handler is plumbed in during initial package import. I am not a fan of the single global instance, but it is the only way to make sure we capture all warnings early on (as it involves single global state, see python warnings documentation), and it simplifies other things we might want to do later (e.g. enabling warnings output to go to the normal OpenMSI logging facilities).

One other thing: how does this all play with encrypted messages? Does anyone know? KafkaCrypto really needs a single KafkaCryptoStore backing object, e.g., while this naively looks like the heartbeat one might trample over the regular message production one?

@davidelbert
Copy link
Contributor

Thanks for this @tmcqueen-materials. I agree with your first part choice because I am pretty sure we will use heartbeat a lot and detailed logging only when we have something to debug so keeping them separate seems right.

In the second part, I agree with the approach for something centralized and consistent rather than each module or component having its own, context-specific logger (like logging.getLogger(name)). Context specific allows a lot of control but I would think it might miss things and certainly would be more work to keep consistent. Doing something else like creating a decoupled, dedicated warnings handler might separate concerns between warning and logging and allow flexibility to escalate warnings to exceptions in specific cases, but it would also increase complexity by introducing another layer of handling and doesn't seem necessary.

In short, I think you've picked the pragmatic, effective solution. If there are more complex needs then thinking about a hierarchical or context-specific design might be worth it, but hopefully Kafka doesn't throw us a lot of problems!

At least that's my two cents as the least development savvy person on this thread!

@tmcqueen-materials
Copy link
Contributor Author

@davidelbert : thanks. I guess the reason I was considering whether heartbeats+logs should be a single item is that a heartbeat message is really just a special type of log message (namely one that gives a timestamp, and associated status, e.g. number of bytes processed, etc). And OpenMSIStream already includes the capabilities to adjust logging level to ones desired level. But if you all are happy with two separate ones, I am happy. Thanks!

@tmcqueen-materials
Copy link
Contributor Author

tmcqueen-materials commented Jan 8, 2025

@davidelbert @Xarthisius : The residual test issues here come from the fact that the CI is using the wrong config files, see #70 , as well as the other S3 access token issue. This should be ready to review.

Responding to my question above about encryption of logs and heartbeats, the answer is: they are not. They are sent in the clear. This presents a confidentiality problem. But it also reveals a more serious issue: since the heartbeat producer/consumer python object is not the same as the one carrying actual data, when encountering issues such as confluentinc/librdkafka#4916 , heartbeats can continue even if the actual data carrying connection is broken (this is true with or without encryption). These can be fixed -- would you like that as part of this PR, or as a new PR?

@tmcqueen-materials tmcqueen-materials marked this pull request as ready for review January 8, 2025 20:09
@Xarthisius
Copy link
Contributor

Xarthisius commented Jan 8, 2025

S3 access token issue.

CircleCI project setup for this repo has Pass secrets to builds from forked pull requests disabled. That's why your tests fail. I'm not sure why they're built and run under openmsi/openmsistream though. PRs from my fork for run under xarthisius/openmsistream on CircleCI and I was able to provide my own credentials.

These can be fixed -- would you like that as part of this PR, or as a new PR?

This is already a big PR, I'd vote for deferring that for now.

@tmcqueen-materials
Copy link
Contributor Author

tmcqueen-materials commented Jan 8, 2025

@Xarthisius : Thanks. I will defer the encryption and heartbeat functionality fixes to a new PR. Maybe David can fix the CircleCI configuration setting you mention so we can get clean tests before merging this?

@Xarthisius
Copy link
Contributor

Xarthisius commented Jan 9, 2025

One of the tests related to this PR timeouts and hangs:

(venv) xarth@shakuras ~/codes/xarthisius/openmsistream/test/test_scripts (self-produce-logs) $ python3 -m unittest test_data_file_directories_encrypted_logs.py -v
[2025-01-09 01:46:50,267] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-01-09 01:46:50,388] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-01-09 01:46:50,489] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2025-01-09 01:46:50,691] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
test_encrypted_upload_and_download_logs_kafka (test_data_file_directories_encrypted_logs.TestDataFileDirectoriesEncryptedLogs)
Test sending and receiving encrypted messages with logs ... [TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:46:58] Will delete existing output location at /home/xarth/codes/xarthisius/openmsistream/test/test_encrypted_upload_and_download_logs_kafka_output_python_3_10_12
[KafkaConfigFileParser 2025-01-08 19:47:05] /home/xarth/codes/xarthisius/openmsistream/venv/lib/python3.10/site-packages/kafkacrypto/cryptostore.py:78: ResourceWarning: unclosed file <_io.TextIOWrapper name='/home/xarth/codes/xarthisius/openmsistream/venv/lib/python3.10/site-packages/openmsistream/kafka_wrapper/config_files/testing_node_2/testing_node_2.config' mode='r+' encoding='UTF-8'>
  self.__file = None

[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:47:15] Waiting to reconstruct files; will timeout after 300 seconds...
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:47:15] 	0 messages read after 0.00 seconds....
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:47:20] 	0 messages read after 5.01 seconds....
.....
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:52:00] 	0 messages read after 285.30 seconds....
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:52:05] 	0 messages read after 290.30 seconds....
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:52:10] 	0 messages read after 295.31 seconds....
%4|1736387531.549|MAXPOLL|rdkafka#consumer-11| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 14ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%4|1736387531.570|MAXPOLL|rdkafka#consumer-10| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 35ms (adjust max.poll.interval.ms for long-running message processing): leaving group
[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:52:15] Quitting download thread after reading 0 messages; will timeout after 30 seconds....

[TestDataFileDirectoriesEncryptedLogs 2025-01-08 19:52:35] Quitting upload thread; will timeout after 90 seconds
FAIL

======================================================================
FAIL: test_encrypted_upload_and_download_logs_kafka (test_data_file_directories_encrypted_logs.TestDataFileDirectoriesEncryptedLogs)
Test sending and receiving encrypted messages with logs
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/xarth/codes/xarthisius/openmsistream/test/test_scripts/test_data_file_directories_encrypted_logs.py", line 112, in test_encrypted_upload_and_download_logs_kafka
    raise exc
  File "/home/xarth/codes/xarthisius/openmsistream/test/test_scripts/test_data_file_directories_encrypted_logs.py", line 87, in test_encrypted_upload_and_download_logs_kafka
    self.assertTrue(reco_fp.is_file())
AssertionError: False is not true

----------------------------------------------------------------------
Ran 1 test in 356.050s

FAILED (failures=1)

^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
KeyboardInterrupt: 

@tmcqueen-materials
Copy link
Contributor Author

tmcqueen-materials commented Jan 9, 2025

@Xarthisius : Yes, that is because of the issue you identified in your not-yet-merged #70. If you look at the just previous test (which has one of the test topics deliberately wrong but works without the config file update of this PR), that test passes: https://app.circleci.com/pipelines/github/openmsi/openmsistream/702 .

Edited to add:

The technical detail: the test that is timing out is test_encrypted_upload_and_download_logs_kafka, which is one of the new tests added with this PR. It times out because the existing kafka_wrapper/config_files/testing_node_2 KafkaCrypto configuration does not specify the new kafka broker topic used by this test, test_oms_encrypted_logs, so the consumer is unable to decrypt the data chunk messages produced. This PR includes an updated testing_node_2 configuration that enables this test to complete successfully (as tested locally by me), but due to the issue described in #70, that updated file is not used by the CI tests.

I was thinking this could be left as-is as #70 will eventually address the underlying problem. If you prefer, I could revert the last commit of this PR, so that test_encrypted_upload_and_download_logs_kafka uses the broker topic test_oms_encrypted_heartbeats instead, which works with the existing kafka_wrapper/config_files/testing_node_2 KafkaCrypto configuration, and then include in the follow-up PR to this the change back to test_oms_encrypted_logs (which will then work because an updated KafkaCrypto test configuration is in place).

@Xarthisius
Copy link
Contributor

We could merge my PR and rebase this one. It should solve it issue. It will require a nasty rebase though.

@tmcqueen-materials
Copy link
Contributor Author

@davidelbert : How would you like to proceed with this? The rebase of this one on top of #70 shouldn't be too bad, as they are essentially orthogonal changes, except a couple small places.

@davidelbert
Copy link
Contributor

@tmcqueen-materials and @Xarthisius , #70 is merged so you guys are set to rebase I think.

Tyrel M. McQueen added 4 commits January 10, 2025 08:01
did not apply the changes to the binary KafkaCrypto configs of
testing_node_2). So regenerate, and while here, add some documentation
of how to do it again in the future.
@tmcqueen-materials
Copy link
Contributor Author

@davidelbert : This one is ready to merge. Thanks!

@davidelbert davidelbert merged commit 84e2d9a into openmsi:main Jan 10, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants