Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add CLPRemoteHandler to handle Multipart Upload to AWS S3 bucket #50

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

IreneLime
Copy link

@IreneLime IreneLime commented Jan 5, 2025

This PR adds CLPRemoteHandler to support S3 Multipart uploading of compressed log files. User can upload existing log files or upload log files that are handled by CLPFileHandler

Description

  • Implement initiate_upload: configure the path on the remote and the object key, then create the multipart upload instance
    • Uses helper function _remote_log_naming to standardize log name on the remote storage through differentiation by Year-Month-Day-HourMinuteSecond
  • Implement multipart_upload: Loop through each 5MB segments of the file and call _upload_part. Performs file rotation once the number of segments exceed 10000 (limited by aws: one file can only have 10000 segments)
    • Uses helper function _upload_part to find the starting position and upload 5MB segment to the remote storage and obtain its segment metadata from remote
  • Implement complete_upload: Upload the last segment and append to the file (last segment of the file do not have minimum of 5MB requirements as stated by aws), then complete the upload instance
  • Supports CLPLogLevelTimeout: timeout function will upload a segment of the provided log file (if it reaches 5MB)
    • Note: Latest clp-ffi-py is required, otherwise running the code will invoke tz_local library error.
  • Implements close() from CLPStreamHandler to complete the multipart upload after the stream closes

User Side Configuration Requirements

Please use aws configure in the command line to configure the aws api key, and ensure that the s3 bucket is created.

Validation performed

The following tests are performed manually.

  • Test if uploading existing file is successful.
    Test method: Run the following code and verify functionality.
 s3_bucket = "s3_bucket_name"
log_path = "local-path-to-log"
remote_handler = CLPRemoteHandler(s3_bucket)
remote_handler.initiate_upload(Path(log_path))
remote_handler.multipart_upload()
remote_handler.complete_upload()
  • Test errors (bucket does not exist, local file does not exist)
  • Test remote file rotation during multipart upload.
    Test method: Test by setting the segment limit to a small number and see if the remaining segments will be uploaded to a new file.
  • Test that initiating uploads while another file is uploading will give errors
    Test methods:
    • Initiating a new upload while another upload is in progress.
    • Perform multipart_upload or complete_upload without any upload in progress.
  • Test if upload will be performed during timeout
    Test method: Run the following code and verify functionality.
 s3_bucket = "s3_bucket_name"
uploader = CLPRemoteHandler(s3_bucket)
loglevel_timeout = CLPLogLevelTimeout(lambda: uploader.timeout(Path("example.clp.zst")))
clp_handler = CLPFileHandler(Path("example.clp.zst"), loglevel_timeout=loglevel_timeout)
logger = logging.getLogger(__name__)
logger.addHandler(clp_handler)

for i in range (10000):
    logger.info("example warning")
clp_handler.close() 

Performed automated test to insert simulated network during upload and see if abort upload is performed.

Unit tests will be added in future PRs to control the length of PRs. Detailed test plan will be provided.

Please let me know if there are any feedback/suggestions. Thanks!

Summary by CodeRabbit

  • New Features
     - Enhanced log management now supports direct cloud uploads, ensuring reliable storage and efficient delivery of log data.
     - Offers configurable options including data compression, timestamp format, and timezone settings for improved log handling.

Copy link

coderabbitai bot commented Jan 5, 2025

Walkthrough

A new handler class, CLPS3Handler, has been introduced in the logging module. This class extends the base handler to encode log messages in CLP IR format and upload them directly to an S3 bucket. The changes include methods for initializing compression, writing log messages to a local buffer, flushing this buffer via multipart upload to S3, and finalizing the upload process upon closure. Necessary imports and type hint adjustments have also been incorporated.

Changes

File Summary
src/clp_logging/handlers.py Added new class CLPS3Handler (extending CLPBaseHandler) with methods init, init, _write, _flush, and close to encode logs and manage multipart S3 uploads; added required imports and updated type hints.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant H as CLPS3Handler
    participant B as Local Buffer
    participant S as S3 Bucket

    C->>H: Sends log message
    H->>B: Write encoded message to buffer
    alt Buffer size below threshold
        B-->>H: Continue buffering
    else Buffer reaches threshold
        H->>B: Trigger flush
        H->>S: Upload log part (multipart upload)
        S-->>H: Upload acknowledgement
    end
    C->>H: Closes logging handler
    H->>S: Finalise multipart upload
    S-->>H: Completion confirmation
    H-->>C: Handler closed
Loading
✨ Finishing Touches
  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@IreneLime IreneLime changed the title Add CLPRemoteHandler to handle Multipart Upload to AWS S3 bucket feat: Add CLPRemoteHandler to handle Multipart Upload to AWS S3 bucket Jan 5, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
src/clp_logging/remote_handlers.py (5)

25-27: Consider storing S3 resource and client in a shared session.
Using a shared boto3.Session can help maintain consistent configuration and mitigate unexpected credential issues, particularly in multi-threaded or multi-process scenarios.


34-38: Allow for a customisable chunk size.
This static 5MB chunk is the minimum S3 part size, which is valid but may not be optimal for all use cases. Consider allowing configuration so that users can choose suitable part sizes, which may improve throughput or performance for larger/smaller logs in different environments.


49-65: Consider adding a prefix for remote log names.
The _remote_log_naming method currently relies solely on a timestamp and the file count. Adding a static or configurable prefix (e.g. application_name/) can help organize logs by application, service, or environment.


115-117: Synchronize concurrent upload attempts.
initiate_upload raises an exception if an upload is already in progress, but concurrency control must also be enforced if multiple threads or processes attempt uploading. Consider a thread lock or other concurrency mechanism to ensure safe usage.


190-230: Enhance error handling for failed multipart completion.
If complete_multipart_upload fails on the final step, the code may orphan the partially uploaded segments. You might want to implement a retry or a fallback mechanism to handle this scenario, avoiding silent data loss.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6c32c14 and 85cac30.

📒 Files selected for processing (1)
  • src/clp_logging/remote_handlers.py (1 hunks)
🔇 Additional comments (2)
src/clp_logging/remote_handlers.py (2)

138-181: Validate logic for extremely large log files.
The code restarts the multipart upload when 10,000 parts are reached. Ensure that re-initializing the upload seamlessly works for logs well beyond this boundary. Large file handling may require further testing, especially under high throughput or multi-day runs.


243-246: Ensure graceful order of operations when closing.
super().close() is invoked prior to finalising the multipart upload. If the superclass’s close logic throws an exception, the upload might remain incomplete. Consider a try-except flow or reversing the order to complete the upload first in certain scenarios if that is safer for your use case.

Comment on lines 67 to 77
def _upload_part(self) -> Dict[str, int | str]:
if self.log_path is None:
raise ValueError("No input file.")

upload_data: bytes
# Read the latest version of the file
try:
with open(self.log_path, "rb") as file:
file.seek(self.multipart_upload_config["pos"])
upload_data = file.read(self.multipart_upload_config["size"])
except FileNotFoundError as e:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle empty or incomplete data reads more gracefully.
If the file has no new data, _upload_part may attempt to upload an empty byte array, which could be wasteful or unneeded. Ensure that logic checks for empty data before proceeding with the upload.

class CLPRemoteHandler(CLPFileHandler):
"""
Handles CLP file upload and comparison to AWS S3 bucket. Configuration of
AWS access key is required. Run command `aws configure`.
Copy link
Member

@junhaoliao junhaoliao Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configuration of AWS access key is required. Run command aws configure.

If I'm not wrong the credentials are stored in ~/.aws by aws configure. Let's try to make the credentials directly configurable in our handler's configuration (probably via some __init__ parameter just like how the HTTPHandler requires a credentials parameter). Only when such credentials are not provided, we fallback to credentials in ~/.aws.

@junhaoliao
Copy link
Member

For test automation, we can look into setting up an S3 server with https://github.com/minio/minio

Copy link
Member

@LinZhihao-723 LinZhihao-723 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments in terms of high-level design problems.
I do have more comments about the coding details but let's fix the design issues first.
Please try to fix the linter and ensure the GitHub workflows passed, thx!
Feel free to ask questions offline


def __init__(
self,
s3_bucket: str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initiating an API that only takes a bucket is not user-friendly. Users should have more flexibility to configure the ingestion path.

self.log_name = log_path.name
self.upload_in_progress = True
timestamp: datetime.datetime = datetime.datetime.now()
self.remote_folder_path = f"logs/{timestamp.year}/{timestamp.month}/{timestamp.day}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hardcoded log path is not desired, especially without any formal documentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @LinZhihao-723, thanks for the comments. Log path is currently provided as user inputs to the initiate_upload and timeout functions. I'll make it into a class argument.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we conclude why we need a path first before taking any actions? (as discussed in other comments)

def set_obj_key(self, obj_key: str) -> None:
self.obj_key = obj_key

def initiate_upload(self, log_path: Path) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really get how this API is supposed to work.

  • Users are not supposed to call this API directly since this class is a logging handler.
  • I don't see the reason to not include this in the object initialization. It's always a good practice to use RAII style.
  • From an API design perspective, this function doesn't really make sense: why take a path as an input in this method? Is this functionality really related to a logging handler? A logging handler should only be responsible for logs emitted through its exposed APIs, not an arbitrary file.

from clp_logging.handlers import CLPFileHandler


class CLPRemoteHandler(CLPFileHandler):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the major issue is that the implementation in this PR doesn't really look like a logging handler, but it is derived from a logging handler class. We need to sort out the following questions first:

  • How to automate uploading with reasonable APIs?
  • How to properly handle logs through logging handler APIs?
  • How to backup logs in the local disk to prevent uploading failure?

@IreneLime
Copy link
Author

Design and Structure
The CLPS3Handler class sends logging output to streams in CLP IR encoding and uploads streams to the specified AWS S3 bucket.

class CLPS3Handler (s3_bucket)
Returns a new instance of CLPS3Handler class. Configures the provided AWS S3 bucket and calls init() to initialize the output stream and local buffer. Calls _remote_log_naming() to set up the object key. After that, the instance calls create_multipart_upload() from AWS API to start an upload.

_remote_log_naming
Configures the file path on the S3 bucket based on logs/year/month/day/.
Configures the remote file name to YY-MM-DD-HourMinuteSec_log.
File extension is currently set to “.clp.zst” when compression is enabled, otherwise “.clp” will be used.

_write()
Encodes the log event
Flushes the output stream to a local io buffer.
Calls the _flush() method to perform upload.
If the local io buffer exceeds a predetermined buffer size (5MB), then clear the local buffer and increment the upload index such that new content of the local buffer will be written to the next part of the multipart upload.

_flush()
Calls upload_part from AWS API
Records the upload status (part number and checksum values)
Aborts the multipart upload if there is any exception

close()
Writes EOF character to the end of the output stream
Calls the _flush() method to perform the last upload.
Calls complete_multipart_upload from AWS API
Closes the output stream

Inheritance Diagram
image

Initialization Sequence
image

Workflow diagram
image

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
src/clp_logging/handlers.py (5)

801-807: Docstring is concise
The docstring briefly describes the class purpose. If applicable, consider expanding it to clarify usage constraints, such as AWS credentials requirements and minimum AWS permissions.


808-819: Consider error handling during initial stream write
While this method correctly initializes a Zstandard compressor and writes a preamble, consider wrapping the initial write operation in a try-except to handle potential stream write errors gracefully.


854-866: Zero-padding for logs path
Currently, the month/day folder path omits leading zeros. Although functional, consider a zero-padded folder structure to maintain consistent sorting (e.g., ‘2025/02/04’) and avoid potential confusion.


868-879: Avoid forcing a flush on every write
Calling _flush() for each log event can degrade performance if logging is frequent. Consider selectively flushing only when the buffer reaches the threshold or on a fixed interval.

Example of a conditionally triggered flush:

 def _write(self, loglevel: int, msg: str) -> None:
     if self.closed:
         raise RuntimeError("Stream already closed")
     clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)
     self.ostream.write(clp_msg)
-    self._flush()
+    if self.local_buffer.tell() >= self.buffer_size:
+        self._flush()
     if self.local_buffer.tell() >= self.buffer_size:
         ...

918-951: Remove or guard print(self.uploaded_parts)
Using direct print in production code is often left over from debugging or can clutter logs. If you need this output, consider gating it behind a debug flag or using a logger.

- print(self.uploaded_parts)
+ # If debug printing is needed:
+ # logger.debug(f"Uploaded parts: {self.uploaded_parts}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 85cac30 and 065c28c.

📒 Files selected for processing (1)
  • src/clp_logging/handlers.py (2 hunks)
🔇 Additional comments (4)
src/clp_logging/handlers.py (4)

13-13: Imports look appropriate
These newly added imports (typing, datetime, io, boto3) appear to be used by the new class and methods. No issues observed.

Also applies to: 19-21


799-800: No business logic or content
These lines do not appear to contain meaningful functionality and can be safely ignored.


820-852: Validate multipart upload creation result
This constructor calls create_multipart_upload but does not handle the scenario where “UploadId” might be missing or the call itself might fail. It might be worthwhile to wrap this in a try-except—to manage errors proactively—and log user-friendly error messages or abort cleanly.

Do you want me to craft a verification script to search the codebase for usage of “create_multipart_upload” and check for robust error handling?


880-915: Repeated partial part overwrites
The same part number is reused until the buffer surpasses the 5MB threshold. This repeatedly overwrites part data. If this is intended behaviour, it is acceptable, but it may lead to unneeded S3 traffic. Confirm this logic meets your expectations for how partial chunks are handled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants