From 77f5f992963bdd1ef91b3136ad703acd2762871f Mon Sep 17 00:00:00 2001 From: Jinam Shah <40169508+jinamshah@users.noreply.github.com> Date: Mon, 5 Oct 2020 14:11:00 +0530 Subject: [PATCH] Added script to send messages to sqs in parallel (#165) * Added script to send messages to sqs in parallel * Update README.md --- README.md | 1 + send_sqs_messages_in_parallel/README.md | 19 ++++++++++ .../requirements.txt | 1 + send_sqs_messages_in_parallel/send_to_sqs.py | 37 +++++++++++++++++++ 4 files changed, 58 insertions(+) create mode 100644 send_sqs_messages_in_parallel/README.md create mode 100644 send_sqs_messages_in_parallel/requirements.txt create mode 100644 send_sqs_messages_in_parallel/send_to_sqs.py diff --git a/README.md b/README.md index fd24ea8..6d7d795 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,7 @@ So far, the following projects have been integrated to this repo: |[Random_Email_Generator](Random_Email_Generator)|[Shubham Garg](https://github.com/shub-garg)| |[Tambola_Ticket_Generator](Tambola_Ticket_Generator)|[Amandeep_Singh](https://github.com/Synster)| | [Py_Cleaner](Py_Cleaner) | [Abhishek Dobliyal](https://github.com/Abhishek-Dobliyal) +|[Send messages to sqs in parallel](send_sqs_messages_in_parallel)|[Jinam Shah](https://github.com/jinamshah)| ## How to use : diff --git a/send_sqs_messages_in_parallel/README.md b/send_sqs_messages_in_parallel/README.md new file mode 100644 index 0000000..4513f62 --- /dev/null +++ b/send_sqs_messages_in_parallel/README.md @@ -0,0 +1,19 @@ +# Send messages to sqs in parallel + +A python script that will take a file with a lot of messages to be sent to sqs and will send them to a queue in highly parallelized manner. +
+This is especially useful for batch processing requests. +
+Works when ```aws configure``` is done correctly or iam role is attached to the machine + +## Requirement + +```bash +pip3 install boto3 +``` + +#Usage +Go to Upload_files_to_s3 directory and add your folder's name you want to upload to s3 and then run upload_files_to_s3.py as below: +```bash +$ python3 send_to_sqs.py +``` diff --git a/send_sqs_messages_in_parallel/requirements.txt b/send_sqs_messages_in_parallel/requirements.txt new file mode 100644 index 0000000..170ca55 --- /dev/null +++ b/send_sqs_messages_in_parallel/requirements.txt @@ -0,0 +1 @@ +boto3==1.10.50 \ No newline at end of file diff --git a/send_sqs_messages_in_parallel/send_to_sqs.py b/send_sqs_messages_in_parallel/send_to_sqs.py new file mode 100644 index 0000000..db2961f --- /dev/null +++ b/send_sqs_messages_in_parallel/send_to_sqs.py @@ -0,0 +1,37 @@ +import boto3 +import json +import sys +from multiprocessing import Pool + +file = '/path/to/file_with_records_as_jsonl' +queue_url = 'your-queue-url' +region = 'your-region' +processor_count = int(sys.argv[1]) + + +def send_message(data): + sqs_client = boto3.client('sqs', region_name=region) + sqs_client.send_message_batch( + QueueUrl=queue_url, + Entries=[{'MessageBody': json.dumps(x)} for x in data] + ) + + +def main(file): + temp = [] + total_records_sent = 0 + + with open(file) as f: + data = [json.loads(line) for line in f] + batched_data = [] + for i in range(0, len(data), int(len(data)/processor_count)): + batched_data.append(data[i:i + int(len(data)/processor_count)]) + for _ in Pool(processes=processor_count).imap_unordered(send_message, + batched_data): + temp.append(_) + for x in temp: + total_records_sent += x + + +if __name__ == "__main__": + main(file)