Thursday, 30 September 2010

Asserting Function Calls in Python

One of the nicest features of Python is “duck typing”, which means you don’t need to create interfaces to allow you to swap out implementations. Instead you simply create a different object that has the functions you need.

One really powerful use of this is in unit testing, allowing you to create lightweight replacements for dependencies without the need for a powerful mocking framework. Having said that, sometimes you need to be able to do things like checking that a function was called on an existing object. I asked about this on StackOverflow, and got a variety of different approaches to this problem.

Thanks to another feature of Python, sometimes called “monkey patching” you can take any object and replace an existing function with your own. This is obviously very powerful (and potentially dangerous) but it opens up all sorts of possibilities.

Here’s an example of monkey patching to replace the existing implementation of MyFunc with a lambda expression that simply counts how many times it was called.

def testMyFunc():
    obj = MyObject()
    calls = 0
    obj.MyFunc = lambda: calls += 1
    # DoSomething should call MyFunc
    DoSomething(obj)
    assert calls == 1

To take this one step further, we might wish to still call through to the original implementation of MyFunc. We can simply this by creating a helper class:

class MethodCallLogger(object):
    def __init__(self, meth):
        self.meth = meth
        self.CallCount = 0

    def __call__(self, *args):
        self.meth(*args)
        self.CallCount += 1

This class will call through to the original function, as well as count how many times it was called. The __call__ function is a way of allowing a class to be called as though it were a function. The *args syntax simply lets us support functions with multiple parameters. These could then be saved into a list and made available to the unit test if necessary. Here’s our first example again, using the MethodCallLogger class:

def testMyFunc():
    obj = MyObject()
    logger = MethodCallLogger(obj.MyFunc)
    obj.MyFunc = logger
    # DoSomething should call MyFunc
    DoSomething(obj)
    assert logger.CallCount == 1

Wednesday, 29 September 2010

Convert 16 bit PCM to IEEE float

NAudio has had the Wave32Stream for quite some time which converts a 16 bit PCM stream into a stereo IEEE floating point stream, with optional panning and volume. However, it could do with something simpler, that doesn’t automatically convert to stereo. So here is a preliminary implementation of an IWaveProvider that converts 16 bit PCM to IEEE float. It keeps the Volume property as that is always useful to have available. It keeps the code nice and clean by making use of the WaveBuffer class. I’ll probably add this to NAudio in the near future.

/// <summary>
/// Converts 16 bit PCM to IEEE float, optionally adjusting volume along the way
/// </summary>
public class Wave16toIeeeProvider : IWaveProvider
{
    private IWaveProvider sourceProvider;
    private readonly WaveFormat waveFormat;
    private volatile float volume;
    private byte[] sourceBuffer;

    /// <summary>
    /// Creates a new Wave16toIeeeProvider
    /// </summary>
    /// <param name="sourceStream">the source stream</param>
    /// <param name="volume">stream volume (1 is 0dB)</param>
    /// <param name="pan">pan control (-1 to 1)</param>
    public Wave16toIeeeProvider(IWaveProvider sourceProvider)
    {
        if (sourceProvider.WaveFormat.Encoding != WaveFormatEncoding.Pcm)
            throw new ApplicationException("Only PCM supported");
        if (sourceProvider.WaveFormat.BitsPerSample != 16)
            throw new ApplicationException("Only 16 bit audio supported");

        waveFormat = WaveFormat.CreateIeeeFloatWaveFormat(sourceProvider.WaveFormat.SampleRate, sourceProvider.WaveFormat.Channels);

        this.sourceProvider = sourceProvider;
        this.volume = 1.0f;
    }

    /// <summary>
    /// Helper function to avoid creating a new buffer every read
    /// </summary>
    byte[] GetSourceBuffer(int bytesRequired)
    {
        if (sourceBuffer == null || sourceBuffer.Length < bytesRequired)
        {
            sourceBuffer = new byte[bytesRequired];
        }
        return sourceBuffer;
    }

    /// <summary>
    /// Reads bytes from this wave stream
    /// </summary>
    /// <param name="destBuffer">The destination buffer</param>
    /// <param name="offset">Offset into the destination buffer</param>
    /// <param name="numBytes">Number of bytes read</param>
    /// <returns>Number of bytes read.</returns>
    public int Read(byte[] destBuffer, int offset, int numBytes)
    {
        int sourceBytesRequired = numBytes / 2;
        byte[] sourceBuffer = GetSourceBuffer(sourceBytesRequired);
        int sourceBytesRead = sourceProvider.Read(sourceBuffer, offset, sourceBytesRequired);
        WaveBuffer sourceWaveBuffer = new WaveBuffer(sourceBuffer);
        WaveBuffer destWaveBuffer = new WaveBuffer(destBuffer);

        int sourceSamples = sourceBytesRead / 2;
        int destOffset = offset / 4;
        for (int sample = 0; sample < sourceSamples; sample++)
        {
            destWaveBuffer.FloatBuffer[destOffset++] = (sourceWaveBuffer.ShortBuffer[sample] / 32768f) * volume;
        }

        return sourceSamples * 4;
    }

    /// <summary>
    /// <see cref="IWaveProvider.WaveFormat"/>
    /// </summary>
    public WaveFormat WaveFormat
    {
        get { return waveFormat; }
    }

    /// <summary>
    /// Volume of this channel. 1.0 = full scale
    /// </summary>
    public float Volume
    {
        get { return volume; }
        set { volume = value; }
    }
}

Tuesday, 28 September 2010

Countdown Kata in Python

One of my favourite programming exercises is solving the “countdown” numbers game. Basically, you are given a set of input numbers and have to solve the target by adding, multiplying, subtracting or dividing them (using each input number only once).

As before, this isn’t an ideal solution, as I’m still getting to grips with Python. It uses recursion to find the first solution. I don’t keep track of the closest answer yet.

import unittest

class SolverUnitTests(unittest.TestCase):
    testCases = ( 
        (0, [], True),
        (1, [], False),
        (1, [1], True),
        (2, [1], False),
        (2, [1,1], True),
        (2, [1,7], False),
        (3, [1,1,1], True),
        (1, [3,2], True), # subtract
        (1, [2,3], True),
        (6, [2,3], True), # multiply
        (7, [2,3], False), 
        (4, [8,2], True), # divide
        (4, [2,8], True), # divide
        (4, [9,2], False), 
        (14, [1,7,1], True), # add and multiply
        (18, [1,7,3], True), # subtract and multiply
        (100, [11, 1, 11, 1], True),
        (2010, [25, 4, 2, 10, 5, 2], True),
        (2011, [25, 4, 2, 10, 5, 2], True),
        (2012, [25, 4, 2, 10, 5, 2], True),
        (2013, [25, 4, 2, 10, 5, 2], True),
        (2014, [25, 4, 2, 10, 5, 2], True),
        (16, [2,2,2], False)
        )

    def testCanSolve(self):
        for (target, numbers, solveable) in self.testCases:
            print 'solving', target, 'with', numbers
            solver = Solver(target)
            self.assertEqual(solveable, solver.Solve(numbers))
        
def add(first,second):
    answer = first + second
    return (answer, '%d+%d=%d' % (first,second,answer))

def subtract(first,second):
    answer = first - second
    if answer < 0:
        answer = 0
    return (answer, '%d-%d=%d'  % (first,second,answer))

def multiply(first,second):
    answer = first * second
    if answer < 0:
        answer = 0
    return (answer, '%d*%d=%d'  % (first,second,answer))

def divide(first,second):
    if (second == 0) or ( first % second != 0):
        answer = 0
    else:
        answer = first / second
    return (answer, '%d/%d=%d'  % (first,second,answer))

def pairs(list):
    for i in range(len(list)):
        for j in range(i+1,len(list)):
            yield (list[i],list[j])

class Solver:
    def __init__(self, target):
        self.target = target
        self.operations = (add, subtract, multiply, divide)
        
    def Solve(self, numbers):
        if (self.target in numbers) or (self.target == 0):
            return True
        return self.SolveList(numbers, '')
    
    def SolveList(self, numbers, solutionSoFar):
        numbers.sort(reverse=True)
        for (first, second) in pairs(numbers):
            for func in self.operations:
                (newNumber,solution) = func(first,second)
                if newNumber == self.target:
                    print self.target, ':', solutionSoFar + ', ' + solution
                    return True
                elif newNumber:
                    newList = list(numbers)
                    newList.remove(first)
                    newList.remove(second)
                    newList.append(newNumber)
                    #print 'retry with', newList
                    if self.SolveList(newList, solutionSoFar + ', ' + solution):
                        return True
        return False

IronPython Codebreaker Katacast

As promised I recorded a quick katacast of myself using the IronPython continuous testing script I blogged about while I solve the codebreaker kata in Python. Don’t expect super fast coding or best practices – I’m still very much an IronPython newbie, but I have improved the solution slightly over my original offering. There were a few other refactorings I intended to make but I decided that 10 minutes was long enough.

I’m afraid I haven’t dubbed any classical music onto the recording (it would be incongruous to combine beautiful music with my ugly code). I used Expression Encoder 3 for the screen recording – for some reason Expression Encoder 4 doesn’t work on my computer (makes the recorded area go white making it completely impossible to do anything). You may notice ValueError come up on occasions after I save. I still don't know what causes this, but I simply save again and IronPython successfully reloads and runs the tests. Sadly it looks like the aspect ratio has somehow got squashed in the process of uploading to Vimeo, but it’s still readable.

Here’s the code:

import unittest

class MarkerTests(unittest.TestCase):
    def testNoMatch(self):
        marker = Marker('rgby')
        mark = marker.Mark('xxxx')
        self.assertEqual('', mark)

    def testOneImperfectMatch(self):
        marker = Marker('rgby')
        mark = marker.Mark('xrxx')
        self.assertEqual('m', mark)

    def testTwoImperfectMatches(self):
        marker = Marker('rgby')
        mark = marker.Mark('xrgx')
        self.assertEqual('mm', mark)

    def testImperfectMatchNotDoubleCounted(self):
        marker = Marker('rgby')
        mark = marker.Mark('xrrx')
        self.assertEqual('m', mark)

    def testOnePerfectMatch(self):
        marker = Marker('rgby')
        mark = marker.Mark('xgxx')
        self.assertEqual('p', mark)

    def testOnePerfectOneImperfectMatch(self):
        marker = Marker('rgby')
        mark = marker.Mark('xgxb')
        self.assertEqual('pm', mark)

    def testOnePerfectOnly(self):
        marker = Marker('rgby')
        mark = marker.Mark('rrrr')
        self.assertEqual('p', mark)

    def testAllPerfect(self):
        marker = Marker('rgby')
        mark = marker.Mark('rgby')
        self.assertEqual('pppp', mark)

        
class Marker:
    def __init__(self, answer):
        self.answer = answer
        
    def Mark(self, guess):
        perfectMatches = self.CountPerfectMatches(guess)
        anyPositionMatches = self.CountAnyPositionMatches(guess)
        return perfectMatches * 'p' + (anyPositionMatches - perfectMatches) * 'm'
        
    def CountPerfectMatches(self, guess):
        return sum([a == b for (a,b) in zip(guess, self.answer)])

    def CountAnyPositionMatches(self, guess):
        count = 0
        answerList = list(self.answer)
        for c in guess:
            if c in answerList:
                count += 1
                answerList.remove(c)
        return count

Friday, 24 September 2010

Autotest for IronPython

Continuing my explorations of IronPython, I decided I wanted a continuous test setup, which would automatically run my unit tests every time I saved a .py file, which was something I had seen on various “katacasts”. After a bit of investigation, I found the promising looking modipyd, which seemed to be Windows friendly. Unfortunately, github won’t let me download the files, so I set about creating my own basic continuous test tool for IronPython.

One advantage of using IronPython is that it immediately gives me access to the .NET framework’s FileSystemWatcher, which meant I didn’t have to worry about learning threading in Python. I did however have to work around one quirk which meant that the file changed event could get triggered multiple times, despite a single save command in my code editor.
Another challenge was working out how to load or reload a module given its name. This is done with the __import__ function, and using the sys.modules dictionary for the reload.

It took slightly longer than I hoped to get it fully working. Occasionally I get a spurious ValueError thrown when it attempts a reload. I’m not sure what that is all about. It should also be improved to rerun tests on all loaded modules not just the one that changed if you are not working entirely within a single file.
Again, any Python gurus feel free to suggest improvements.

import unittest
import clr
import sys
from System.IO import FileSystemWatcher
from System.IO import NotifyFilters
from System import DateTime

def changed(sender, args):
    global lastFileTimeWatcherEventRaised
    if DateTime.Now.Subtract(lastFileTimeWatcherEventRaised).TotalMilliseconds < 500:
        return
    moduleName = args.Name[:-3]
    if reloadModule(moduleName):
        runTests(moduleName)
    lastFileTimeWatcherEventRaised = DateTime.Now

def reloadModule(moduleName):
    loaded = False
    try:
        if(sys.modules.has_key(moduleName)):
            print 'Reloading ' + moduleName    
            reload(sys.modules[moduleName])
        else:
            print 'Importing ' + moduleName
            __import__(moduleName)
        loaded = True
    except SyntaxError, s:
        print 'Syntax error loading ' + s.filename, 'line', s.lineno, 'offset', s.offset
        print s.text
    except:
        #sometimes get a ValueError here, not sure why
        error = sys.exc_info()[0]
        print error
    return loaded

def runTests(moduleName):
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromModule(sys.modules[moduleName])
    if suite.countTestCases() > 0:
        runner = unittest.TextTestRunner()
        runner.run(suite)
    else:
        print 'No tests in module'

def watch(path):
    watcher = FileSystemWatcher()
    watcher.Filter = '*.py'
    watcher.Changed += changed
    watcher.Path = path
    watcher.NotifyFilter = NotifyFilters.LastWrite
    watcher.EnableRaisingEvents = 1
    
lastFileTimeWatcherEventRaised = DateTime.Now

if __name__ == '__main__':
    print 'Watching current folder for changes...'
    watch('.')
    input('press Enter to exit')

If I get a chance I’ll record my own “katacast” showing the autotest python module in action.

Update: I've made the katacast. I've also made a slight improvement to the autotest code, moving the setting of lastFileTimeWatcherEventRaised further down to stop long-running tests thwarting the multiple-event filtering.

Thursday, 23 September 2010

Getting Started With IronPython

My first experience of Python was not a good one. I was working on a project to automate the testing of some telecoms equipment. This meant calling a lot of COM objects, which, back in 2003 at least, Python was not very good at. Also, the rudimentary Windows IDE available for Python at the time had a very annoying habit of mixing tabs and spaces, which meant that the indentation level you saw was not necessarily the indentation level you got. The other annoyance was regularly discovering syntax errors in my error reporting code, resulting in the reason for the failure of the overnight test run being lost forever.

But since Microsoft have never really offered a good scripting language for .NET, I decided to revisit Python in the form of IronPython. I’ve been slowly working my way through IronPython in Action, and trying to get back up to speed with the syntax (this online tutorial is very helpful).

As a simple way in, I decided to solve the “codebreaker” kata (basically the Mastermind game). Here are a few of the rudimentary issues I hit along the way.

First, get yourself a command prompt in the folder you are writing your .py file. The windows shortcut to the IronPython console will put you in the wrong place. If IronPython is not already in your path, enter:

set path=%PATH%;"c:\Program Files\IronPython 2.7\" 

This will allow you to type either ipy to get the IronPython console, or ipy filename.py to run your script directly.

Second, IronPython 2.7 Alpha 1 seems to have a bug calling import unittest. This means that you can’t make use of the built-in unit test support that Python has. I had to switch to normal Python to carry on (although I suspect IronPython 2.6 would have worked too).

Third, the unit test support in Python sadly doesn’t support the equivalent to NUnit’s [TestCase] attribute, meaning that parameterized unit tests aren’t supported (without writing some very clever code). There is a feature request filed against Python for this. For the time being I made use of a list of tuples to store my test data.

Fourth, there seems to be no find method for a list (although there is on string). You can use index but it will throw an exception if the item is not found.

In case you are interested in my (very sub-optimal) solution, the code follows. Without a doubt there are better ways to do this in Python. Please feel free to offer suggestions for improvement in the comments below.

import unittest

class CodeBreakerTest(unittest.TestCase):
    testcases = (
        ('xxxx',''),
        ('bxxx','m'),
        ('xbxx','m'),
        ('xxyx','m'),
        ('xxxb','m'),
        ('ybxx','mm'),
        ('xxrb','mm'),
        ('ybrx','mmm'),
        ('ybrg','mmmm'),
        ('bbxx','m'),
        ('rxxx','p'),
        ('xgxx','p'),
        ('xxbx','p'),
        ('xxxy','p'),
        ('rgxx','pp'),
        ('rgbx','ppp'),
        ('rgby','pppp'),
        ('rbxx','pm'),
        ('rgyx','ppm'),
        ('rbgy','ppmm') )
    
    def testAll(self):
        marker = Marker('rgby')
        for guess, answer in self.testcases:
            print 'Testing "' + guess + '", expecting "' + answer + '"'
            mark = marker.Mark(guess)
            self.assertEquals(answer, mark)
            
    def test2(self):
        marker = Marker('rggg')
        guess = 'rgyy'
        answer = 'pp'
        mark = marker.Mark(guess)
        self.assertEquals(answer, mark)
        
    def test3(self):
        marker = Marker('rgxx')
        guess = 'rggg'
        answer = 'pp'
        mark = marker.Mark(guess)
        self.assertEquals(answer, mark)

class Marker(object):
    def __init__(self, secret):
        self.secret = secret
        
    def Mark(self, guess):
        perfect = self.PerfectMatch(guess)
        wrongPos = self.WrongPositionMatch(guess)
        wrongPos = wrongPos[len(perfect):]
        return perfect + wrongPos
    
    def PerfectMatch(self, guess):
        answer = ''
        for i in range(len(guess)):
            if self.secret[i] == guess[i]:
                answer += 'p'
        return answer
    
    def WrongPositionMatch(self, guess):
        answer = ''
        secretList = [x for x in self.secret]
        for c in guess:
            index = self.Find(secretList,c)
            if index != -1:
                answer += 'm'
                secretList[index] = []
        return answer

    def Find(self, list, search):
        for i in range(len(list)):
            if (list[i] == search):
                return i
        return -1

if __name__ == '__main__':
    unittest.main()

Wednesday, 22 September 2010

Unit Testing Object Persistence

Most applications have the need to save data to disk in order to reload it later. Very often this simply means the use of a database, particularly when you are dealing with server side programming. But those of us doing predominantly client side development often need to save data to custom file formats, or perhaps XML, in order to reload it at a later date. One of the trickiest challenges that surrounds this is how to ensure that future versions of your application can still successfully load in data saved in earlier versions. There are two types of test you need to write, to be sure you have got your persistence code right.

Roundtrip Testing

The first type of test is to take an object (or object graph), save it to a temporary file, and reload it in. Then you need to assert that the exported object is identical to the imported one (an overridden Equals method can be helpful here).

It is important to make sure you cover every possible special case that can be exported, particularly every class that might need to be serialized at some point. Here’s a very simple example of a round-trip test:

string fileName = "test.tmp";
Widget exported = new Widget();
exported.Name = "xyz";
exported.Weight = 20;
WidgetExporter.Export(exported, fileName);
Widget imported = WidgetImporter(fileName);
Assert.AreEqual(exported.Name, imported.Name);
Assert.AreEqual(exported.Weight, imported.Weight);

Legacy Import Testing

There are lots of gotchas surrounding preserving the ability to import data from legacy versions of your application. These are particularly tricky if you use .NET’s built-in XML or binary serialisation. While they can cope with new fields or fields being removed, when properties change their type, or move from one class into a sub-class, it can break horribly.

So the second type of test needed is to import some real exported data. What is needed is a store of real exported data from every version of your software has ever been in the hands of a customer. If you can automate the creation of such data, all the better. Again, you need to ensure that your test data includes an example of every possible type of exported object.

Ideally, your unit tests would go through each file, import it, and meticulously check that all the properties of the imported object are set correctly. In practice, this can be too time consuming to write all the necessary assert statements.

Typically we just choose a few representative files to check thoroughly. But there is still value in importing everything else. Often, deserialization code will throw exceptions on errors, so simply successfully importing several hundreds of files even without checking their contents is a worthwhile test.

Future Proof Serialization

One last piece of advice. Choose file formats and deserialization code that are very flexible to change. There is nothing worse than not being able to change a class or object hierarchy because it will break serialization. Where possible use XML, as it is much easier to handle wholesale changes to schemas down the line.

Friday, 17 September 2010

Push Complexity to the Edges

I blogged a while back about “technical debt interest rates” where I argued that not all “technical debt” is created equal (and by technical debt, I am meaning code that is hard to maintain, e.g because it is overly complex or tightly coupled). Sometimes shortcuts make you pay in the long run, but other times they turn out to be a smart move after all. This raises the question of whether you can know how risky the technical debt you are introducing actually is.

Of course, without the ability to accurately predict the future, you can never know, but I want to propose a simple rule of thumb. The closer your compromised code is to the core of your codebase, the greater penalties it will incur.

Imagine an application whose architecture looks like this:

Clean Architecture

Here we have a fairly clean architecture where the core part of the application talks to three modules which are all isolated from each other. Suppose for a minute that Module A contains terribly complicated code because it was rushed out the door in a hurry. The technical debt it contains won’t actually cause us any pain at all if we need to extend Module B or Module C, or even add a new Module D. That is because it is isolated from the rest of the application.

However, consider a more realistic version of what happens when technical debt is introduced:

Complexity in the Core

Here, the code for feature A was not isolated into its own module, but is inextricably intertwined with the core code. Now we are in big trouble. Because although we may not want to make any changes to feature A, anyone who works on the core of our application has to deal with the added complexity that is in there.

If this seems obvious, that’s because it is. After all, the very compromise being made when introducing technical debt is often that we don’t have time to separate the new functionality out into its own isolated module. However, the time required to extract feature A out afterwards is much greater than doing it right first time, and becomes almost impossible after the same mistake has been made several times over.

The key is to recognise when you are introducing complexity into the core of your application. This is technical debt that will be very expensive. A plugin-in architecture, on the other hand, can allow you to have several isolated areas of complexity that may not require the debt to be paid back. This is why it makes sense to start new applications with a loosely coupled, extensible architecture, rather than deciding you will plumb one in at a later date.

Tuesday, 14 September 2010

Codebase Reformation

I am currently looking into how the architecture of a very large software product (now over 1 million lines of code) can be improved to make it more maintainable and extensible into the future. Inevitably on a project of its size, many of the assumptions and expectations held at the beginning have not proved to be correct. The product has been extended in ways that no one could have foreseen. Sometimes large amounts of technical debt has been introduced as we rushed to meet a vital deadline. New technologies have emerged (it was started as a .NET 1.1 product) which are more suited to the task than anything that was around back then. And additionally, those of us on the development team have grown as programmers during the time. What looked like good code to us back then now looks decidedly poor.

So my task over the last few months has been to produce a document of recommendations about what architectural changes should be made to improve things. I have a good idea of where we are now, and where we ought to be going. But there is one thing that concerns me, and it is summed up well in the following quote:

The reformer is always right about what is wrong. He is generally wrong about what is right. —G.K. Chesterton

In other words, it is one thing to look at a codebase and observe all the things that are wrong about it. It is another thing entirely to know what the correct solution is. Experience tells us that simply adopting a new technology (“let’s use IoC containers”, “let’s use WPF”) will typically solve one set of problems but introduce another set.

The correct approach in my view is to recognise that we are trying to move between two moving targets. In other words, “where we are” is always moving, since any living codebase is being continually worked on. But also, “where we want to be” is also a moving target, as we grow in our understanding of what the system needs to do, and what constitutes a well-architected system.

Bearing this in mind, it is a mistake therefore to imagine that you can, or should, attempt to “fix” the architecture of a large system in one gigantic refactoring session. There may be a case for making certain major changes to prepare the way for specific new features, and address significant areas of technical debt, but in my view, the best approach to codebase reformation is continual refactoring, allowing our vision of where we are heading to be modified as our horizons expand.