In [1]:
%%HTML


In [5]:
def id2bit(ls: list):
 """
 Converts a list of indices to a binary representation (bit array).

 Given a list of indices, this function creates a binary list where each index in
 the input list is set to 1 in the output list, and all other positions are set to 0.
 The output list is then reversed before returning.

 Args:
 ls (list): A list of indices where each index will be set to 1 in the output list.

 Returns:
 list: A list of binary values (0s and 1s), where each index in the input list corresponds
 to a 1 in the output binary list, and all other indices are 0.
 """
 if len(ls) == 0:
 return [0, 0, 0, 0, 0, 0, 0, 0] # Return a default 8-bit array
 aa = [0 for i in range(max(ls) + 1)]
 for i in ls:
 aa[i] = 1
 return aa[::-1]


def bit2id(ls: list):
 """
 Converts a binary list (bit array) to a list of indices where the value is 1.

 This function iterates over the binary list and returns a list of indices where the binary value is 1.
 The list is reversed before returning.

 Args:
 ls (list): A list of binary values (0s and 1s).

 Returns:
 list: A list of indices where the corresponding binary value in the input list is 1.
 """
 ls = ls[::-1]
 aa = []
 for i in range(len(ls)):
 if ls[i] == 1:
 aa.append(i)
 return aa[::-1]


def XOR(*args):
 """
 Performs bitwise XOR on a sequence of values.

 This function takes any number of arguments and performs the XOR operation iteratively
 across all the input values.

 Args:
 *args: A sequence of values (typically integers) on which the XOR operation will be applied.

 Returns:
 int: The result of applying the XOR operation across all input values.
 """
 result = 0
 for arg in args:
 result ^= arg
 return result


class LFSR:
 """
 A class representing a Linear Feedback Shift Register (LFSR).

 This class models an LFSR, which generates a sequence of bits based on an initial state
 and a feedback polynomial. The LFSR can be clocked to generate subsequent bits in the sequence.

 Attributes:
 seq (list): The current state (bit sequence) of the LFSR.
 taps (list): The positions of the taps used for feedback calculation.

 Methods:
 clock(): Shifts the bits in the LFSR and computes the new bit based on the feedback.
 """

 def __init__(self, start, poly):
 """
 Initializes an LFSR with a start state and a feedback polynomial.

 Args:
 start (list): The initial state of the LFSR, represented as a list of bits (0s and 1s).
 poly (list): A list representing the feedback polynomial, with 1s indicating the taps.

 Raises:
 ValueError: If the length of the start state does not match the polynomial length minus one.
 """
 self.seq = start
 self.taps = bit2id(poly[:-1]) # ignore the output tap (final bit)

 if len(self.seq) != len(poly) - 1:
 raise ValueError("Polynomial and start value length mismatch")

 def clock(self):
 """
 Advances the LFSR by one clock cycle.

 This method computes the feedback bit by XORing the bits at the tap positions,
 shifts the state, and adds the feedback bit to the beginning of the sequence.
 """
 feedback = XOR(*[self.seq[bit] for bit in self.taps])
 self.seq = [feedback] + self.seq[:-1]


class A51:
 """
 A class representing the A5/1 stream cipher.

 A51 is a stream cipher used in GSM encryption. It combines three LFSRs and uses a majority rule
 to control which LFSRs are clocked. The output is the XOR of the last bits of the LFSRs.

 Attributes:
 lfsrs (list): A list of LFSR instances.
 clock_bits (list): The bit positions used for clocking each LFSR.
 lfsr_count (int): The number of LFSRs used in the cipher.

 Methods:
 majority(*bits): Computes the majority bit from a list of bits.
 clock(): Advances the cipher and returns the next bit of the keystream.
 """

 def __init__(self, lfsrs, clock_bits):
 """
 Initializes the A51 cipher with a list of LFSRs and their clocking bits.

 Args:
 lfsrs (list): A list of LFSR instances used to generate the keystream.
 clock_bits (list): A list indicating the bit positions in each LFSR to use for majority voting.
 """
 self.lfsrs = lfsrs
 self.clock_bits = clock_bits
 self.lfsr_count = len(clock_bits)

 def majority(self, *bits):
 """
 Computes the majority bit from a sequence of bits.

 This method determines the majority (1 or 0) from the given bits. If the number of 1s
 is greater than or equal to half of the number of LFSRs, the majority bit is 1; otherwise, it is 0.

 Args:
 *bits: A sequence of bits (typically 0s and 1s) for which the majority is to be determined.

 Returns:
 int: The majority bit (0 or 1).
 """
 ones = sum(i for i in bits if i == 1)
 if ones >= self.lfsr_count / 2:
 majority_bit = 1
 else:
 majority_bit = 0
 return majority_bit

 def clock(self):
 """
 Advances the A51 cipher by one clock cycle and generates the next keystream bit.

 This method computes the majority bit from the specified clocking positions of the LFSRs,
 clocks the LFSRs if necessary, and outputs the XOR of the last bits of each LFSR as the next
 bit of the keystream.

 Returns:
 int: The next bit in the keystream generated by the A51 cipher.
 """
 majority = self.majority(
 *[self.lfsrs[i].seq[self.clock_bits[i]] for i in range(self.lfsr_count)]
 )
 for i in range(self.lfsr_count):
 if self.lfsrs[i].seq[self.clock_bits[i]] == majority:
 self.lfsrs[i].clock()
 out = XOR(*[int(i.seq[-1]) for i in self.lfsrs])
 return out


# Example usage
lf1 = LFSR(start=[1, 0, 1, 1], poly=id2bit([4, 1]))
lf2 = LFSR(start=[0, 1, 1, 1], poly=id2bit([4, 1]))
lf3 = LFSR(start=[1, 0, 1, 0], poly=id2bit([4, 1]))
a51 = A51(lfsrs=[lf1, lf2, lf3], clock_bits=[1, 2, 0])

# Generate a keystream of 10 bits
stream = [a51.clock() for i in range(10)]
stream

[1, 0, 0, 0, 0, 1, 0, 1, 1, 0]

In [6]:
import os


def write2txt_file(bitstream, filename):
 """
 Writes a bitstream (string of '0's and '1's) to a text file.

 This function opens a text file in append mode and writes the provided bitstream to it.

 Args:
 bitstream (str): A string of '0's and '1's representing the bitstream to be written.
 filename (str): The path to the text file where the bitstream will be written.
 """
 with open(filename, "a") as f: # Open in append mode to continue writing
 f.write(bitstream)


def write2bin_file(bitstream, filename):
 """
 Writes a bitstream (string of '0's and '1's) to a binary file.

 This function converts the bitstream into bytes, pads it to ensure it's a multiple of 8 bits,
 and then writes it to a binary file in append mode.

 Args:
 bitstream (str): A string of '0's and '1's representing the bitstream to be written.
 filename (str): The path to the binary file where the bitstream will be written.
 """
 byte_list = []

 # Pad the bitstream if it's not a multiple of 8
 padding = (8 - (len(bitstream) % 8)) % 8
 bitstream += "0" * padding # Add extra '0's to make the length a multiple of 8

 for i in range(0, len(bitstream), 8):
 byte = bitstream[i : i + 8]
 byte_list.append(int(byte, 2)) # Convert 8 bits to an integer (byte)

 # Append the bytes to the binary file
 with open(filename, "ab") as f: # 'ab' mode to append to the binary file
 f.write(bytearray(byte_list))


def gen_bit_stream(data: dict, target_size: int, file_path: str):
 """
 Generates a keystream using the A51 cipher and writes it to a file.

 This function initializes the LFSRs based on the provided data, generates a keystream
 using the A51 cipher, and writes the generated bits to a text file or binary file
 in chunks. It keeps track of the current size of the output file and prints progress
 at each 10% interval.

 Args:
 data (dict): A dictionary containing information about the LFSRs, including their
 start values, polynomials, and clock positions.
 target_size (int): The target size of the file in bytes. The function will stop once
 this size is reached.
 file_path (str): The path to the output file where the generated bitstream will be written.
 """
 # Initialize the LFSRs and A51 cipher
 lfsrs = [LFSR(start=i["start"], poly=i["poly"]) for i in data]
 a51 = A51(lfsrs=lfsrs, clock_bits=[i["clock"] for i in data])

 current_size = 0
 bitstream_chunk = "" # Chunk of bits to write periodically
 chunk_size = (
 10000 # Number of bits to generate at a time (can adjust for performance)
 )
 progress_interval = target_size // 10 # 1/10th of the target size (100 MB)
 next_progress_checkpoint = progress_interval

 # Generate bits until the target file size is reached
 while current_size < target_size:
 # Generate bits in chunks
 for _ in range(chunk_size):
 bitstream_chunk += str(a51.clock())

 # Write the chunk to file
 write2txt_file(bitstream_chunk, file_path)

 # Clear the chunk and update the current file size
 bitstream_chunk = ""
 current_size = os.path.getsize(file_path)

 # Check if the file size has crossed the 1/10th checkpoint
 if current_size >= next_progress_checkpoint:
 print(
 f"File size crossed {round(next_progress_checkpoint / (1024 * 1024), 2)} MB"
 )
 next_progress_checkpoint += (
 progress_interval # Update to next 10% checkpoint
 )

 print(f"File generation complete: {file_path} (target)")

In [7]:
data = [
 {"start": [0, 1, 0, 1, 1], "poly": id2bit([5, 2, 0]), "clock": 2},
 {"start": [1, 0, 0, 1, 0], "poly": id2bit([5, 4, 3, 1, 0]), "clock": 3},
 {"start": [0, 1, 1, 0, 0], "poly": id2bit([5, 4, 2, 1, 0]), "clock": 2},
]
gen_bit_stream(data, target_size=1 * 1024**2, file_path="mine_gen_100MB.txt")

File size crossed 0.1 MB
File generation complete: mine_gen_100MB.txt (target)
