Python/cryptography/lfsr_bit_stream.ipynb
Bibekananda Hati 27332fa728 cryptography
2024-11-20 22:52:43 +05:30

246 lines
7.8 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "65571230-ccb3-41a1-a361-31097b31bc5b",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<style>.container{width:100%}</style>\n"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%HTML\n",
"<style>.container{width:100%}</style>"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "f12511c2-01b6-45f4-b6b6-1bfe1dc93631",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[1, 0, 0, 0, 0, 1, 0, 1, 1, 0]"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def id2bit(ls:list):\n",
" if(len(ls)==0):\n",
" return [0,0,0,0, 0,0,0,0] # Return a default 8-bit array\n",
" aa = [0 for i in range(max(ls)+1)]\n",
" for i in ls:\n",
" aa[i] = 1\n",
" return aa[::-1]\n",
"\n",
"def bit2id(ls:list):\n",
" ls = ls[::-1]\n",
" aa = []\n",
" for i in range(len(ls)):\n",
" if(ls[i] == 1):\n",
" aa.append(i)\n",
" return aa[::-1]\n",
" \n",
"def XOR(*args):\n",
" result = 0\n",
" for arg in args:\n",
" result ^= arg\n",
" return result\n",
" \n",
"class LFSR:\n",
" def __init__(self,start,poly):\n",
" self.seq = start\n",
" self.taps = bit2id(poly[:-1]) # ignore the output tap (final bit)\n",
"\n",
" if len(self.seq) != len(poly) - 1:\n",
" raise ValueError(\"Polynomial and start value length mismatch\")\n",
"\n",
" def clock(self):\n",
" # print(self.seq) \n",
" feedback = XOR(*[self.seq[bit] for bit in self.taps])\n",
" \n",
" self.seq = [feedback] + self.seq[:-1]\n",
"\n",
" \n",
"class A51:\n",
" def __init__(self,lfsrs,clock_bits):\n",
" self.lfsrs = lfsrs\n",
" self.clock_bits = clock_bits\n",
" self.lfsr_count = len(clock_bits)\n",
" def majority(self, *bits):\n",
" ones = sum(i for i in bits if i==1)\n",
" if ones >= self.lfsr_count / 2:\n",
" majority_bit = 1\n",
" else:\n",
" majority_bit = 0\n",
" return majority_bit\n",
" \n",
" def clock(self):\n",
" majority = self.majority(*[self.lfsrs[i].seq[self.clock_bits[i]] for i in range(self.lfsr_count)])\n",
" for i in range(self.lfsr_count):\n",
" if(self.lfsrs[i].seq[self.clock_bits[i]] == majority):\n",
" self.lfsrs[i].clock()\n",
" out = XOR(*[int(i.seq[-1]) for i in self.lfsrs])\n",
" return out\n",
" \n",
"lf1 = LFSR(start=[1,0,1,1],poly=id2bit([4,1]))\n",
"lf2 = LFSR(start=[0,1,1,1],poly=id2bit([4,1]))\n",
"lf3 = LFSR(start=[1,0,1,0],poly=id2bit([4,1]))\n",
"a51 = A51(lfsrs=[lf1,lf2,lf3],clock_bits=[1,2,0])\n",
"stream = [a51.clock() for i in range(10)]\n",
"stream"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "182b2a83-d083-4296-a3bc-4d4f14dd8724",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"def write2txt_file(bitstream, filename):\n",
" \"\"\"Writes a bitstream (string of '0's and '1's) to a text file.\"\"\"\n",
" with open(filename, 'a') as f: # Open in append mode to continue writing\n",
" f.write(bitstream)\n",
"\n",
"def write2bin_file(bitstream, filename):\n",
" byte_list = []\n",
" \n",
" # Pad the bitstream if it's not a multiple of 8\n",
" padding = (8 - (len(bitstream) % 8)) % 8\n",
" bitstream += '0' * padding # Add extra '0's to make the length a multiple of 8\n",
" \n",
" for i in range(0, len(bitstream), 8):\n",
" byte = bitstream[i:i+8]\n",
" byte_list.append(int(byte, 2)) # Convert 8 bits to an integer (byte)\n",
" \n",
" # Append the bytes to the binary file\n",
" with open(filename, 'ab') as f: # 'ab' mode to append to the binary file\n",
" f.write(bytearray(byte_list))\n",
"\n",
"def gen_bit_stream(data:dict,target_size,file_path):\n",
" lfsrs = [LFSR(start=i[\"start\"],poly=i[\"poly\"]) for i in data]\n",
" a51 = A51(lfsrs=lfsrs,clock_bits=[i[\"clock\"] for i in data])\n",
" \n",
" # filename = 'bitstream_output_1GB.bin'\n",
" # target_size = 1 * 1024 * 1024 * 1024 # 1 GB in bytes\n",
" current_size = 0\n",
"\n",
" bitstream_chunk = \"\" # Chunk of bits to write periodically\n",
" chunk_size = 10000 # Number of bits to generate at a time (can adjust for performance)\n",
" progress_interval = target_size // 10 # 1/10th of the target size (100 MB)\n",
" next_progress_checkpoint = progress_interval\n",
" \n",
" while current_size < target_size:\n",
" # Generate bits in chunks\n",
" for _ in range(chunk_size):\n",
" bitstream_chunk += str(a51.clock())\n",
"\n",
" # Write chunk to file\n",
" # write2bin_file(bitstream_chunk, filename)\n",
" write2txt_file(bitstream_chunk, file_path)\n",
" \n",
" # Clear the chunk and update the current file size\n",
" bitstream_chunk = \"\"\n",
" current_size = os.path.getsize(file_path)\n",
" # Check if the file size has crossed the 1/10th checkpoint\n",
" if current_size >= next_progress_checkpoint:\n",
" print(f\"File size crossed {round(next_progress_checkpoint / (1024 * 1024),2)} MB\")\n",
" next_progress_checkpoint += progress_interval # Update to next 10% checkpoint\n",
"\n",
" \n",
"\n",
" print(f\"File generation complete: {file_path} (target)\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "ebf2b473-4277-4b99-9935-96802dc52488",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File size crossed 0.1 MB\n",
"File size crossed 0.2 MB\n",
"File size crossed 0.3 MB\n",
"File size crossed 0.4 MB\n",
"File size crossed 0.5 MB\n",
"File size crossed 0.6 MB\n",
"File size crossed 0.7 MB\n",
"File size crossed 0.8 MB\n",
"File size crossed 0.9 MB\n",
"File size crossed 1.0 MB\n",
"File generation complete: mine_gen_100MB.txt (target)\n"
]
}
],
"source": [
"data = [\n",
" {\"start\": [0, 1, 0, 1, 1], \"poly\": id2bit([5, 2, 0]), \"clock\": 2},\n",
" {\"start\": [1, 0, 0, 1, 0], \"poly\": id2bit([5, 4, 3, 1, 0]), \"clock\": 3},\n",
" {\"start\": [0, 1, 1, 0, 0], \"poly\": id2bit([5, 4, 2, 1, 0]), \"clock\": 2}\n",
"]\n",
"gen_bit_stream(data, target_size=1*1024**2, file_path=\"mine_gen_100MB.txt\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "80acd699-63bc-4391-ac05-8166a843b1ca",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "01038d4c-ab7f-486c-b300-a95187f4cd76",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}