“Solving” Tetris with Apache Spark

Published

June 7, 2023

GitHub Repository

Overview

This project implements an algorithm in Apache Spark that solves the stacking portion of Tetris for a specified board width. Since the piece order in Tetris is randomized, the challenge of stacking pieces lies in knowing what stack configuration can handle the widest variety of possible piece sequences before a hole is created in the stack. Starting from a given stack configuration, the algorithm uses recursion to compute the average number of pieces that a player will be able to place under optimal play before creating a hole. Once a table of these average values is computed, the optimal strategy is to always place the next piece such that the average for the new stack configuration is maximized.

The walkthrough.ipynb notebook in the GitHub repo (linked above) contains a description of the algorithm along with a sample calculation for a small Tetris board. The full source code is contained in the analyze.py, generate.py, and play.py Python modules, which can be run in the environment generated by requirements.txt.

A non-interactive version of the Jupyter notebook is reproduced below.

The Stacking Game

Tetris is arguably the most popular video game ever made, having seen countless releases on virtually every piece of computer hardware imaginable. The game was originally developed by Alexey Pajitnov in 1985, and requires the player to stack a set of falling pieces called “tetrominos” into a continuous tower, such that there are no gaps between the pieces. Points are scored by filling in an entire row of blocks, at which point those blocks are removed from the board and the height of the tower is reduced. The game ends when the player is unable to clear rows fast enough and allows the tower to reach the top of the board.

The challenge of Tetris comes from the shapes of the tetrominos, which consist of all seven polygons that can be built from four equally-sized square blocks. These pieces can be rotated and translated as they fall, with the player needing to interlock them together into a continuous stack in order to clear rows. Since the order of the pieces is random and unknown to the player, an optimal strategy must consider not only the placement of the current piece, but also how the next piece (and all subsequent pieces) can be incorporated into the tower. The overarching goal is to ensure that each piece fits with the previous pieces to form a continuous surface devoid of holes, so that rows can be cleared. This is the challenge that we wish to “solve” using an algorithm.

The word “solve” is presented here in quotes because our goal is not to find the optimal strategy in its entirety. Instead, we seek only an optimal strategy for stacking pieces until it is impossible to prevent a hole from forming somewhere in the tower. In a real game of Tetris this does not cause play to end, as there are ways of repairing these holes and continuing the stack. By focusing solely on continous piece stacking, we drastically reduce the complexity of the problem, making it possible to find strong solutions with reasonable computational resources.

Problem Complexity

To pin down the overall complexity of this problem, we must define the height and width of the area where we are allowed to stack pieces (referred to as the board). In standard Tetris the board is 20 blocks high and 10 blocks across, but for maximum scoring it is common to initially leave the left-most column unfilled while stacking. This means that the maximum effective play area is 20 x 9, which can fit 21^9 = 794,280,046,581 differently-shaped piece stacks inside of it. In general, the number of possible stacks is given by (height + 1)^width, since each column can have a number of blocks in the range [0, height], and there are width distinct columns with their own independent configurations.

For each tower configuration, we will need to consider all of the possible ways that the tetrominos can be stacked on top without creating any holes. There is no simple formula for this quantity, as it is highly dependent on the specific contours of the stack. A more concrete quantity to consider is the maximum number of pieces that can be placed in a sequence. Since no piece can be placed when the board is completely full, and since each piece has four blocks, we can stack at most height * width / 4 pieces before completely running out of room. The full 20 x 9 board, for example, can have at most 45 pieces in any sequence, although most towers will terminate significantly earlier than this.

Description of the Algorithm

Since Tetris is a game with incomplete information (the sequence of pieces), we will seek a strategy that maximizes the average performance over all sequences. Our measure of performance will be the number of pieces stacked before a hole is formed, with an optimal strategy maximizing this number. Such a strategy can be found via recursion by computing a set of sequence-limited averages, up to the maximum number that can fit on the board. The optimal strategy is then to place each piece such that you end up with a tower configuration that has the highest expected stack length, conditioned on the remaining number of pieces that can be stacked before filling the board.

To compute the required set of averages, we can follow an algorithm with the following steps for each tower configuration:

  1. Compute the valid placements for all seven tetrominos, taking into account every possible translation and rotation.
  2. Compute the probability that the next piece in the sequence will be possible to stack without creating a hole. This is the average number of placed pieces for a sequence of length 1.
  3. For each piece type, find the tower configuration with the highest average computed in step 2 that can be reached by placing that piece type, and then compute a new average by taking the mean of these maximimal averages. This is the average number of placed pieces for a sequence of length 2.
  4. Repeat step 3 n more times, maximizing the averages generated from the previous iteration in the cycle. This gives the average number of placed pieces for a sequence of length n + 2. The value of n should be chosen so that n - 2 is less than the maximum number of pieces that can fit on the board.

Each portion of this algorithm can be carried out using the DataFrame API in Apache Spark.

Computing the connections

For step 1 of the algorithm, we need to compute all of the different ways that a starting tower configuration can transition to a different configuration after placing a piece. To do this, we start by defining two different ways to represent a Tetris board. Since we are only interested in towers without holes, we can describe each configuration uniquely by listing out the height of each column. This array-based representation can be compressed into a single integer (which we will call the board id) by adding together powers of the board height. For a standard height of 20, we construct an integer that is 21^0 = 1 times the height of the first column, 21^1 = 21 times the height of the second column, 21^2 = 441 times the height of the third column, and so on. This ensures that each board has a single unique integer that represents it. The code which converts between these two representations is given below.

import math

def get_board_columns(board_id, height, width):
    board = []
    for x in range(1, width + 1):
        height_power = (height + 1)**(x - 1)
        shift_to_ones = math.floor(board_id / height_power)
        digit = shift_to_ones % (height + 1)
        board.append(digit)
    return board

def get_board_id(board, height):
    width = len(board)
    board_id = 0
    for x in range(1, width + 1):
        power = (height + 1)**(x - 1)
        board_id += board[x - 1]*power
    return board_id

test_id = 145672560876
test_board = get_board_columns(test_id, 20, 9)
inverted_id = get_board_id(test_board, 20)
print(f"Map id to columns and then back to id: {test_id} -> {test_board} -> {inverted_id}.")
Map id to columns and then back to id: 145672560876 -> [3, 6, 8, 7, 4, 10, 18, 17, 3] -> 145672560876.

Note that the two representations are useful in different situations: the board id works great as a unique key to specify a given board, but is very difficult to visualize. The column heights, by contrast, are more verbose but somewhat easier to understand at a glance.

To compute the different connections between tower configurations, we will start with a Spark DataFrame that is populated with all possible board ids using the range function. Each row will then be expanded into its height-based representation using a set of column functions that mimic get_board_columns from the previous code block. To determine where pieces can be placed without creating holes, we must match the contour of the tower with the bottom of the piece. The possible contours for each of the seven tetrominos are returned by the following function:

def get_piece_maps():
    pieces = {}
    pieces["i"] = [([0, 0, 0], [1, 1, 1, 1]), ([], [4])]
    pieces["t"] = [([-1, 1], [1, 2, 1]), ([-1], [1, 3]), ([0, 0], [1, 2, 1]), ([1], [3, 1])]
    pieces["sq"] = [([0], [2, 2])]
    pieces["zr"] = [([0, 1], [1, 2, 1]), ([-1], [2, 2])]
    pieces["zl"] = [([-1, 0], [1, 2, 1]), ([1], [2, 2])]
    pieces["lr"] = [([1, 0], [2, 1, 1]), ([-2], [1, 3]), ([0, 0], [1, 1, 2]), ([0], [3, 1])]
    pieces["ll"] = [([0, -1], [1, 1, 2]), ([0], [1, 3]), ([0, 0], [2, 1, 1]), ([2], [3, 1])]
    return pieces

Each piece is represented by a pair of tuples, with the first tuple containing the relative changes in column height that are needed (for each of the possible piece orientations) in order to place the piece without creating any holes. By looping through all positions, we can compute all valid placements and thus all connections to other tower configurations. The following code shows a Spark routine that will carry out these calculations for an arbitrary height and width (resources permitting):

import pyspark.sql.functions as sfn
import pyspark.sql.types as stype

from src import analyze

def generate_connections(height, width, spark_session):

    # Ths function finds all mappings from one tower configuration to
    # another that occur when placing a Tetris piece. The resulting table
    # has columns "id", "new_id", and "piece", with "new_id" being the tower
    # configuration that can be reached by adding "piece" to the board represented
    # by "id".

    max_id = (height + 1)**width 
    ids = spark_session.range(max_id) # DataFrame populated by all possible board ids 
    all_cols = get_board_columns(height, width)
    boards = ids.withColumns(all_cols) # Dataframe now has board heights as well as ids
    schema = stype.StructType([
        stype.StructField("id", stype.LongType(), False), 
        stype.StructField("new_id", stype.LongType(), True), 
        stype.StructField("piece", stype.StringType(), False)])
    all_connections = spark_session.createDataFrame([], schema = schema) # Empty dataframe that will be iteratively updated
    for (piece_name, maps) in analyze.pieces.items():
        for (surface, piece_heights) in maps:
            piece_width = len(piece_heights)
            if piece_width <= width: # Pieces wider than the board cannot be placed
                for pos in range(1, width - piece_width + 2): # Iterate over all positions that the piece can be stacked at
                    col_list = [sfn.col(f"{x + pos}") for x in range(piece_width)] # List columns that are intersected by piece
                    diff_list = [col_2 - col_1 for (col_1, col_2) in zip(col_list[:-1], col_list[1:])] # Get relative heights of columns
                    board_surfaces = boards.select("*", *diff_list)
                    matched_boards = board_surfaces
                    for (col, diff) in zip(diff_list, surface):
                        matched_boards = matched_boards.filter(col == diff) # Piece can only be placed if it exactly matches the tower surface
                    raised_cols = {f"{x + pos}":(col + added) for (x, (col, added)) in enumerate(zip(col_list, piece_heights))}
                    raised_boards = matched_boards.select(*boards.columns).withColumns(raised_cols) # Generate new board after adding piece
                    valid_boards = raised_boards
                    for col in col_list:
                        valid_boards = valid_boards.filter(col <= height) # Remove tower configuration that are too tall
                    new_id_col = get_board_id(height, width)
                    connections = valid_boards.select("id", new_id_col.alias("new_id"), sfn.lit(piece_name).alias("piece"))
                    all_connections = all_connections.union(connections)
    return all_connections

This function assembles the full connections table out of subtables corresponding to each piece and placement position. While the number of tower configurations and thus connections will scale exponentially with the board width, the operations themselves are all narrow and can be done in a distributed manner without any shuffle operations.

Computing the averages

After the connections are tabulated, the rest of the algorithm focuses on computing the average number of pieces that can be placed before the formation of a hole is unavoidable. Since our goal is to choose a strategy that maximizes this average, we will want to stack pieces such that we always move to a new configuration that itself has a high average. For a finite number of pieces, we can always construct an algorithm that starts with a piece sequence of length one and then recursively computes optimal averages for a sequence of length n using results from length n -1. The key observation is that the nth optimal average is just one plus a weighted sum of the n-1 averages maximized with respect to each of the seven possible pieces. The code for this portion of the algorithm is given below:

import pyspark.sql.functions as sfn

def generate_avgs(height, width, spark_session):

    # This function uses the connection table created by generate_connections
    # to compute the average number of successful piece placements before a hole
    # is formed. This average is calculated with respect to optimal piece placement,
    # which is enforced via a recursive algorithm.

    all_connections = spark_session.read.format("parquet").load("connection_path")
    ids = spark_session.range((height + 1)**width)
    avgs = old_avgs = ids.withColumn("avg", sfn.lit(0.0)) # This is the "average" for a piece sequence of length 0
    max_depth = int(height*width / 4)
    for depth in range(max_depth + 1):
        if depth > 0: # Iterations greater than length 0 are generated from previously-saved results
            avgs.write.format("parquet").save(f"{width}-{height}/{depth}")
            old_avgs = spark_session.read.format("parquet").load(f"{width}-{height}/{depth}")
        print(f"Depth: {depth}")
        # The first step of the algorithm is to append the previous averages for each "new_id" column in the connections table
        connection_avgs = all_connections.alias("c").join(old_avgs.alias("a"), sfn.col("c.new_id") == sfn.col("a.id"), "left")
        connection_avgs = connection_avgs.select("c.id", "c.piece", "a.avg").fillna(0.0)
        # Then, the maximum avg is chosen for each starting configuration and piece
        best_choice = connection_avgs.groupBy("id", "piece").agg(sfn.max("avg").alias("max_avg"))
        # Finally, a weighted average of one plus the previous max averages is computed
        avgs = best_choice.select("id", (1/7 + sfn.col("max_avg")/7).alias("max_avg")).groupBy("id").agg(sfn.sum("max_avg").alias("avg"))
        depth +=1

Unlike the connection algorithm from the previous section, the Spark operations here involve join and aggregation functions that require shuffling of the partition data. This is fundamentally unavoidable, as the data cannot be partitioned in a way that is efficient for both the extraction of previous averages (the join) and computation of the optimal piece placement (the aggregation) simultaneously.

Results

A single computer can comfortably compute results for a board width of around 6, but for simplicity we will look at a smaller playfield that is only 4 blocks wide. To validate the results of the algorithm, we can play a large number of Tetris games and compare the observed mean sequence length with the predicted length. This is implemented in the following code from play.py:

import random

import pyspark.sql as sql
import pyspark.sql.functions as sfn

from src import analyze

class Avgs():
    
    # This class handles the retrieval of averages from the tables computed
    # in generate.py.
    
    def __init__(self, spark, height, width, memory = False):
        
        # The class can operate in memory mode if the number of tower configurations is
        # small enough, or it can read data directly from the parquet files.
            
        num_pieces = int(height*width / 4)
        self.board_avgs = spark.read.format("parquet").load(f"example_data/avgs_20-4")
        if memory:
            self.board_dict = {row.asDict()["id"]:row.asDict()["avg"] for row in self.board_avgs.toLocalIterator()}
        self.memory = memory

    def get(self, board_id):
        if self.memory:
            avg = float(self.board_dict.get(int(board_id), 0))
        else:
            rows = self.board_avgs.filter(sfn.col("id") == board_id).select("avg").collect()
            avg = rows[0].asDict()["avg"] if rows else 0
        return avg

def play_game(start_board, height, num_moves, avgs):
    
    # This function plays a game of Tetris using the
    # provided starting configuration, height, and number
    # of moves. For each piece, the placement with the
    # highest average is chosen.
    
    board = start_board
    moves = [start_board]
    for i in range(num_moves, 0, -1):
        piece = random.choice(list(analyze.pieces.keys()))
        connections = analyze.get_connections(board, height)
        choices = []
        for board in connections[piece]:
            board_id = analyze.get_board_id(board, height)
            avg = avgs.get(str(board_id))
            choices.append((board, avg))
        if len(choices) == 0:
            break
        board = max(choices, key = lambda tupl:tupl[1])[0]
        moves.append(board)
    return moves

def multiple_runs(runs, start_board, height, num_pieces, avgs):
    
    # This function plays a given number of Tetris games using the
    # specified parameters, and then prints the theoretical average,
    # observed average, and best performance.
    
    expected = avgs.get(analyze.get_board_id(start_board, height))
    move_lists = []
    for game in range(runs):
        moves = play_game(start_board, height, num_pieces, avgs)
        move_lists.append(moves)
        print(f"\rGame {game + 1} / {runs}", end = "")
    totals = [len(moves[1:]) for moves in move_lists]
    print("")
    print(f"Exp: {expected:.4f} | Avg: {sum(totals) / runs:.4f} | Best:{max(totals)}")
    return move_lists

board = 1
height = 20
width = 4
runs = int(1e5)
num_pieces = int(height*width / 4)

spark = sql.SparkSession.builder.config("spark.driver.memory", "15g").config('spark.ui.showConsoleProgress', False).getOrCreate()
avgs = Avgs(spark, height, width, memory = True)
start_board = analyze.get_board_columns(board, height, width)
runs = multiple_runs(runs, start_board, height, num_pieces, avgs)