# Chess Board Detection and Chess Piece Recognition

In [1]:
import numpy as np
import scipy.spatial as spatial
import scipy.cluster as clstr
from time import time
from collections import defaultdict
from functools import partial
from sklearn.utils import shuffle
import os, glob, caffe, skimage, cv2, shutil

SQUARE_SIDE_LENGTH = 227
categories = ['bb', 'bk', 'bn', 'bp', 'bq', 'br', 'empty', 'wb', 'wk', 'wn', 'wp', 'wq', 'wr']

## Board Detection

How it works:

1. Canny edge detection
2. Hough line transform
3. calculate intersection points
4. Agglomerative clustering of intersection points
5. find corners
6. perspective shift


In [2]:
def auto_canny(image, sigma=0.33):
    """
    Canny edge detection with automatic thresholds.
    """
    # compute the median of the single channel pixel intensities
    v = np.median(image)
 
    # apply automatic Canny edge detection using the computed median
    lower = int(max(0, (1.0 - sigma) * v))
    upper = int(min(255, (1.0 + sigma) * v))
    edged = cv2.Canny(image, lower, upper)
 
    # return the edged image
    return edged

def hor_vert_lines(lines):
    """
    A line is given by rho and theta. Given a list of lines, returns a list of
    horizontal lines (theta=90 deg) and a list of vertical lines (theta=0 deg).
    """
    h = []
    v = []
    for distance, angle in lines:
        if angle < np.pi / 4 or angle > np.pi - np.pi / 4:
            v.append([distance, angle])
        else:
            h.append([distance, angle])
    return h, v

def intersections(h, v):
    """
    Given lists of horizontal and vertical lines in (rho, theta) form, returns list
    of (x, y) intersection points.
    """
    points = []
    for d1, a1 in h:
        for d2, a2 in v:
            A = np.array([[np.cos(a1), np.sin(a1)], [np.cos(a2), np.sin(a2)]])
            b = np.array([d1, d2])
            point = np.linalg.solve(A, b)
            points.append(point)
    return np.array(points)

def cluster(points, max_dist=50):
    """
    Given a list of points, returns a list of cluster centers.
    """
    Y = spatial.distance.pdist(points)
    Z = clstr.hierarchy.single(Y)
    T = clstr.hierarchy.fcluster(Z, max_dist, 'distance')
    clusters = defaultdict(list)
    for i in range(len(T)):
        clusters[T[i]].append(points[i])
    clusters = clusters.values()
    clusters = map(lambda arr: (np.mean(np.array(arr)[:,0]), np.mean(np.array(arr)[:,1])), clusters)
    return clusters

def closest_point(points, loc):
    """
    Returns the list of points, sorted by distance from loc.
    """
    dists = np.array(map(partial(spatial.distance.euclidean, loc), points))
    return points[dists.argmin()]

def find_corners(points, img_dim):
    """
    Given a list of points, returns a list containing the four corner points.
    """
    center_point = closest_point(points, (img_dim[0] / 2, img_dim[1] / 2))
    points.remove(center_point)
    center_adjacent_point = closest_point(points, center_point)
    points.append(center_point)
    grid_dist = spatial.distance.euclidean(np.array(center_point), np.array(center_adjacent_point))
    
    img_corners = [(0, 0), (0, img_dim[1]), img_dim, (img_dim[0], 0)]
    board_corners = []
    tolerance = 0.25 # bigger = more tolerance
    for img_corner in img_corners:
        while True:
            cand_board_corner = closest_point(points, img_corner)
            points.remove(cand_board_corner)
            cand_board_corner_adjacent = closest_point(points, cand_board_corner)
            corner_grid_dist = spatial.distance.euclidean(np.array(cand_board_corner), np.array(cand_board_corner_adjacent))
            if corner_grid_dist > (1 - tolerance) * grid_dist and corner_grid_dist < (1 + tolerance) * grid_dist:
                points.append(cand_board_corner)
                board_corners.append(cand_board_corner)
                break
    return board_corners

def four_point_transform(img, points, square_length=SQUARE_SIDE_LENGTH):
    board_length = square_length * 8
    pts1 = np.float32(points)
    pts2 = np.float32([[0, 0], [0, board_length], [board_length, board_length], [board_length, 0]])
    M = cv2.getPerspectiveTransform(pts1, pts2)
    return cv2.warpPerspective(img, M, (board_length, board_length))

In [22]:
def find_board(fname):
    """
    Given a filename, returns the board image.
    """
    start = time()
    img = cv2.imread(fname)
    assert img is not None
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray = cv2.blur(gray, (3, 3)) # TODO auto adjust the size of the blur
    
    # Canny edge detection
    edges = auto_canny(gray)
    assert np.count_nonzero(edges) / float(gray.shape[0] * gray.shape[1]) < 0.015

    # Hough line detection
    lines = cv2.HoughLines(edges, 1, np.pi/180, 200)
    lines = np.reshape(lines, (-1, 2))
    
    # Compute intersection points
    h, v = hor_vert_lines(lines)
    assert len(h) >= 9
    assert len(v) >= 9
    points = intersections(h, v)
        
    if False:
        for rho, theta in lines:
            a = np.cos(theta)
            b = np.sin(theta)
            x0 = a*rho
            y0 = b*rho
            x1 = int(x0 + 4000*(-b))
            y1 = int(y0 + 4000*(a))
            x2 = int(x0 - 4000*(-b))
            y2 = int(y0 - 4000*(a))
            cv2.line(img,(x1,y1),(x2,y2),(0,0,255),2)
        cv2.imwrite('lines.jpg', img)
    
    # Cluster intersection points
    points = cluster(points)
    
    # Find corners
    img_shape = np.shape(img)
    points = find_corners(points, (img_shape[1], img_shape[0]))
    
    if False:
        for point in points:
            cv2.circle(img, tuple(point), 25, (0,0,255), -1)
        cv2.imwrite('points.jpg', img)
    
    # Perspective transform
    new_img = four_point_transform(img, points)

    return new_img

def split_board(img):
    """
    Given a board image, returns an array of 64 smaller images.
    """
    arr = []
    sq_len = img.shape[0] / 8
    for i in range(8):
        for j in range(8):
            arr.append(img[i * sq_len : (i + 1) * sq_len, j * sq_len : (j + 1) * sq_len])
    return arr

cv2.imwrite('crop.jpg', find_board('./pictures_test/IMG_0977.jpg'))

True

## Generating Training/Test Data

In [4]:
labels = {}
with open('./labels.txt', 'r') as f:
    for line in f:
        if line[0] == '#':
            continue
        arr = line.split()
        assert len(arr) == 2
        assert arr[0] not in labels
        labels[arr[0]] = arr[1]

def get_label_arr(key):
    """
    Get the list of piece labels for the specific image number.
    
    key: The image number
    returns: List of piece labels, e.g. ['empty', 'wn', 'empty', ...]
    """
    fen = labels[key]
    fen = fen.replace('1', '_')
    fen = fen.replace('2', '__')
    fen = fen.replace('3', '___')
    fen = fen.replace('4', '____')
    fen = fen.replace('5', '_____')
    fen = fen.replace('6', '______')
    fen = fen.replace('7', '_______')
    fen = fen.replace('8', '________')
    fen = fen.replace('/', '')
    arr = []
    for char in fen:
        if char == '_':
            arr.append('empty')
        elif char == 'x':
            arr.append('del')
        elif char.islower():
            arr.append('b' + char.lower())
        else:
            arr.append('w' + char.lower())
    assert len(arr) == 64
    return arr

In [5]:
TRAIN_DIRS = {'input': './pictures_train/', 'intermediate': './crops_train/', 'output': './output_train/'}
TEST_DIRS = {'input': './pictures_test/', 'intermediate': './crops_test/', 'output': './output_test/'}

M_90deg_rotation = cv2.getRotationMatrix2D((SQUARE_SIDE_LENGTH / 2, SQUARE_SIDE_LENGTH / 2), 90, 1)

def write_augmented_output(output_dir, img_num, images, labels):
    for i in range(len(images)):
        if labels[i] == 'del':
            continue
        if not os.path.exists(output_dir + labels[i]):
            os.makedirs(output_dir + labels[i])
        img_90 = cv2.warpAffine(images[i], M_90deg_rotation, (SQUARE_SIDE_LENGTH, SQUARE_SIDE_LENGTH))
        img_180 = cv2.warpAffine(img_90, M_90deg_rotation, (SQUARE_SIDE_LENGTH, SQUARE_SIDE_LENGTH))
        img_270 = cv2.warpAffine(img_180, M_90deg_rotation, (SQUARE_SIDE_LENGTH, SQUARE_SIDE_LENGTH))
        cv2.imwrite(output_dir + labels[i] + '/' + img_num + '_' + str(i) + '.jpg', images[i])
        cv2.imwrite(output_dir + labels[i] + '/' + img_num + '_' + str(i) + '_90.jpg', img_90)
        cv2.imwrite(output_dir + labels[i] + '/' + img_num + '_' + str(i) + '_180.jpg', img_180)
        cv2.imwrite(output_dir + labels[i] + '/' + img_num + '_' + str(i) + '_270.jpg', img_270)

def gen_data(dir_dict):
    for path in dir_dict.values():
        assert path[-1] == '/'
    shutil.rmtree(dir_dict['intermediate'], True)
    shutil.rmtree(dir_dict['output'], True)
    os.makedirs(dir_dict['intermediate'])
    os.makedirs(dir_dict['output'])
    
    input_paths = glob.glob(dir_dict['input'] + '*.jpg')
    percent = 0
    for i in range(len(input_paths)):
        new_percent = 100 * float(i) / len(input_paths)
        if new_percent > percent + 5:
            percent = new_percent
            print str(percent) + '%'
        
        input_path = input_paths[i]
        input_img_num = input_path[input_path.rfind('_') + 1 : input_path.rfind('.')]
        board = find_board(input_path)
        cv2.imwrite(dir_dict['intermediate'] + input_img_num + '.jpg', board)
        piece_images = split_board(board)
        piece_labels = get_label_arr(input_img_num)
        write_augmented_output(dir_dict['output'], input_img_num, piece_images, piece_labels)
    print 'Done'


gen_data(TEST_DIRS)

10.0%
20.0%
30.0%
40.0%
50.0%
60.0%
70.0%
80.0%
90.0%
Done


Caffe list file

In [6]:
def get_label_for_fname(fname):
    for i in range(len(categories)):
        if '/' + categories[i] + '/' in fname:
            return str(i)
    raise

fnames = glob.glob(TRAIN_DIRS['output'] + '*/*.jpg')
fnames = shuffle(map(os.path.abspath, fnames))
with open('caffe_train.txt', 'w+') as f:
    for fname in fnames:
        f.write(fname + ' ' + get_label_for_fname(fname) + '\n')
        
fnames = glob.glob(TEST_DIRS['output'] + '*/*.jpg')
fnames = shuffle(map(os.path.abspath, fnames))
with open('caffe_test.txt', 'w+') as f:
    for fname in fnames:
        f.write(fname + ' ' + get_label_for_fname(fname) + '\n')

print 'Done'

Done


In [8]:
# Compute mean

fnames = glob.glob(TRAIN_DIRS['intermediate'] + '*.jpg')
means = None
for fname in fnames:
    this_mean = np.mean(np.reshape(cv2.imread(fname), (-1, 3)), axis=0)
    if means is None:
        means = this_mean
    else:
        means = np.vstack((means, this_mean))
means = np.mean(means, axis=0) # bgr
print means

[ 131.33953259  145.51087425  134.98660675]


## Predicting boards with the Caffe model

In [12]:
net = caffe.Net('/Users/daylenyang/caffe/models/finetune_chess/deploy.prototxt',
               '/Users/daylenyang/caffe/models/finetune_chess/finetune_chess_iter_5554.caffemodel',
               caffe.TEST)

transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape})
transformer.set_transpose('data', (2,0,1))
transformer.set_mean('data', np.load('/Users/daylenyang/caffe/python/caffe/imagenet/ilsvrc_2012_mean.npy').mean(1).mean(1)) # mean pixel
transformer.set_raw_scale('data', 255)  # the reference model operates on images in [0,255] range instead of [0,1]
transformer.set_channel_swap('data', (2,1,0))  # the reference model has channels in BGR order instead of RGB


In [13]:
def shrink_blanks(fen):
    if '_' not in fen:
        return fen
    new_fen = ''
    blanks = 0
    for char in fen:
        if char == '_':
            blanks += 1
        else:
            if blanks != 0:
                new_fen += str(blanks)
                blanks = 0
            new_fen += char
    if blanks != 0:
        new_fen += str(blanks)
    return new_fen

def get_fen(arr):
    fen = ''
    for sq in arr:
        if sq == 'empty':
            fen += '_'
        elif sq[0] == 'b':
            fen += sq[1]
        else:
            fen += str(sq[1]).upper()
    fens = [fen[i:i+8] for i in range(0, 64, 8)]
    fens = map(shrink_blanks, fens)
    fen = '/'.join(fens)
    return fen

def get_square_to_pieces_dict(prob_matrix):
    d = {}
    for i in range(len(prob_matrix)):
        d[i] = map(lambda x: categories[x], np.argsort(-prob_matrix[i]))
    return d

In [14]:
caffe.set_device(0)
caffe.set_mode_gpu()
start = time()

board = find_board('./pictures_test/IMG_0983.jpg')
squares = split_board(board)
print 'finished board splitting', time() - start, 'sec'
net.blobs['data'].reshape(64,3,227,227)
input_images = [transformer.preprocess('data', skimage.img_as_float(square).astype(np.float32)) for square in squares]
print 'finished input preprocessing', time() - start, 'sec'
net.blobs['data'].data[...] = np.array(input_images)
out = net.forward()['prob']
print get_fen(map(lambda x: categories[x], np.argmax(out, axis=1)))
print 'finished nn', time() - start, 'sec'
    
print 'took', time() - start, 'sec'

finished board splitting 0.352416992188 sec
finished input preprocessing 0.775801897049 sec
8/5k2/r7/3N1NPP/8/8/8/8
finished nn 1.6532959938 sec
took 1.65340399742 sec
