Saturday, February 7, 2009

Performing Asynchronous XML Serialization

If you work at a company anything like mine, you've had to deserialize a large XML file, and been forced to sit there for the Deserialize() method to complete. I haven't done a lot of work with threading, but I figured now was the time to start.

I decided to wrap the normal System.Xml.Serialization.XmlSerializer in a generic class that would encapsulate the threading. I also wanted it to perform some logging if there were issues during deserialization. The process starts with a factory method to return the serializer object.

        public static LoggedXMLSerializer<T> RunDeserializeAsync(string filePath) {
return new LoggedXMLSerializer<T>(filePath);
}


Which immediately calls the private constructor:

        private LoggedXMLSerializer(string filePath)
:
this() {
DeserializingDelegate = PeformDeserialization;


FileSize = new FileInfo(filePath).Length;
// The XML File tends to take up 110% of space in memory as it does on disc
FileSize = (long)(FileSize * 1.10);

TotalMemoryIncrease = 0;

Result = DeserializingDelegate.BeginInvoke(filePath, null, null);
StartingMemorySize = GetCurrentProcessMemoryInUse();
}




Which first marks the PerformDeserialization() method as the DeserializationDelegate. It then determines the file size that is being opened to be used later to determine the progress that is left. For now, it only deserializes an actual file, but it could be extended with the other default constructors of the XmlSerializer.

BeginInvoke() is called on the DeserializationDelege, which starts the deserialization and returns an IAsyncResult object. BeginInvoke() starts a new thread, and calls the assigned Delegate. You can then query the IAsyncResult object to see if it finished, or just call EndInvoke() and your primary thread will wait until the secondary thread finishes.

Immediately after the thread is created, the current size of the process is stored to also be used later to determine the progress left.

The PerformDeserialization() method is exactly what you would do if you weren't invoking it on a separate thread. Create an XmlSerialization object,assign an Event Handler for loading issues, open a file with a StreamReader and call Deserialize().

        private T PeformDeserialization(string filePath) {
XmlSerializer xs = new XmlSerializer(typeof(T));
xs.UnknownNode += new XmlNodeEventHandler(Xs_UnknownNode);

StreamReader reader = File.OpenText(filePath);

return (T)xs.Deserialize(reader);
}


Since the Deserialize() method is still synchronous, the hardest part has been coming up with a good method of determining the progress of the file load. This is what I came up with, but if you, dear reader, have a better idea, I'd like to hear it.

        /// <summary>
/// Best guess at progress based on the size of the file, and the amount of increase in the memory of the process
/// </summary>
public int Progress {
get {
if (Result.IsCompleted) {
return 100;
}
long currentSize = GetCurrentProcessMemoryInUse();
if (currentSize < StartingMemorySize + TotalMemoryIncrease) {
// For Some reason, the current size of memory is smaller than the starting size plus the increase in memory usage
// Assume it is due to some garbage collection in between calls to Progress
// Update the starting memory size so it is equal to the current + total increase
// This assumes that no additional memory was used to deserialize the XML
StartingMemorySize = currentSize - TotalMemoryIncrease;
}
else {
TotalMemoryIncrease = currentSize - StartingMemorySize;
}

int tempProgress = (int)((currentSize - StartingMemorySize) / (double)FileSize * 100);
if (tempProgress < 0) {
tempProgress = 0;
}

if (tempProgress > 125) {
// Must have had a bad starting point, move it back to 75%
StartingMemorySize = (int)(currentSize - .75 * FileSize);
// Reinitialize Total Memory
TotalMemoryIncrease = currentSize - StartingMemorySize;
tempProgress = 75;
}

if (tempProgress > 100) {
tempProgress = 99;
}
return tempProgress;
}
}



The first thing it does, is check the IAsyncResult object to see if it has completed, if it has, then it returns 100%. Done. The next part I added later when I noticed that if I opened up more than one file, the progress of the second file would move to about 25%, then it would drop down to near 0, and stay there until it finished. I'm guessing it is due to the garbage collector collecting a large amount of memory due to the first deserialized object being release. The basic method of determining progress is then calculated, assume that the deserialized XML, will take up nearly the same amount as the serialized, is then performed. Get the increase in memory size since first beginning to deserialize the Xml, and divide it by the size of the file. Then do some checking to see if the progress has grown too large, or is over a 100%. It is not a perfect solution, but was extremely simple to implement, and serves my needs well.

Below is the entire class. Feel free to make comments.

    public class LoggedXMLSerializer<T> {
private delegate T Deserializer (string path);

private Deserializer DeserializingDelegate { get; set; }
private IAsyncResult Result { get; set; }
private long StartingMemorySize { get; set; }
private long TotalMemoryIncrease { get; set; }
private long FileSize { get; set; }
private Dictionary<string, string> XmlUnknowns {get; set;}

#region Public Properties

public T Xml { get; protected set;}

/// <summary>
/// Best guess at progress based on the size of the file, and the amount of increase in the memory of the process
/// </summary>
public int Progress {
get {
if (Result.IsCompleted) {
return 100;
}
long currentSize = GetCurrentProcessMemoryInUse();
if (currentSize < StartingMemorySize + TotalMemoryIncrease) {
// For Some reason, the current size of memory is smaller than the starting size plus the increase in memory usage
// Assume it is due to some garbage collection in between calls to Progress
// Update the starting memory size so it is equal to the current + total increase
// This assumes that no additional memory was used to deserialize the XML
StartingMemorySize = currentSize - TotalMemoryIncrease;
}
else {
TotalMemoryIncrease = currentSize - StartingMemorySize;
}

int tempProgress = (int)((currentSize - StartingMemorySize) / (double)FileSize * 100);
if (tempProgress < 0) {
tempProgress = 0;
}

if (tempProgress > 125) {
// Must have had a bad starting point, move it back to 75%
StartingMemorySize = (int)(currentSize - .75 * FileSize);
// Reinitialize Total Memory
TotalMemoryIncrease = currentSize - StartingMemorySize;
tempProgress = 75;
}

if (tempProgress > 100) {
tempProgress = 99;
}
return tempProgress;
}
}

/// <summary>
/// Returns true when the XML has finished being Deserialized
/// Returns false if it hasn't
/// </summary>
public bool IsCompleted {
get {
if (Result.IsCompleted) {
Xml = DeserializingDelegate.EndInvoke(Result);
return true;
}
else {
return false;
}
}
}

#endregion // Public Properties

/// <summary>
/// Returns a list of all unknown nodes found in the XML in this format
/// Entity Name, First Occurance Line Number, First Occurance Line Position
/// </summary>
/// <returns></returns>
public string GetLog() {
StringBuilder sb = new StringBuilder();
foreach (var item in XmlUnknowns) {
sb.Append(item.Key + ", " + item.Value + Environment.NewLine);
}
return sb.ToString();
}


private LoggedXMLSerializer()
:
base() { // Force Factory Method Use
XmlUnknowns = new Dictionary<string, string>();
}

private LoggedXMLSerializer(string filePath)
:
this() {
DeserializingDelegate = PeformDeserialization;


FileSize = new FileInfo(filePath).Length;
// The XML File tends to take up 110% of space in memory as it does on disc
FileSize = (long)(FileSize * 1.10);

TotalMemoryIncrease = 0;

Result = DeserializingDelegate.BeginInvoke(filePath, null, null);
StartingMemorySize = GetCurrentProcessMemoryInUse();
}

/// <summary>
/// Deserializes the XML on a different thread. Use IsCompleted and Progress to determine status
/// </summary>
/// <param name="filePath"></param>
/// <returns></returns>
public static LoggedXMLSerializer<T> RunDeserializeAsync(string filePath) {
return new LoggedXMLSerializer<T>(filePath);
}

private T PeformDeserialization(string filePath) {
XmlSerializer xs = new XmlSerializer(typeof(T));
xs.UnknownNode += new XmlNodeEventHandler(Xs_UnknownNode);

StreamReader reader = File.OpenText(filePath);

return (T)xs.Deserialize(reader);
}

private long GetCurrentProcessMemoryInUse() {
Process process = Process.GetCurrentProcess();
return process.WorkingSet64;
}

private void Xs_UnknownNode(object sender, XmlNodeEventArgs e) {
if (!XmlUnknowns.ContainsKey(e.Name)) {
XmlUnknowns.Add(e.Name, e.LineNumber + ", " + e.LinePosition);
}
}
}

1 comment:

Unknown said...

If you want to monitor progress, instead of simply doing File.OpenText to get your stream, derive a class from IO.Stream or even maybe StreamReader on which you could add events when bytes goes through. This would also allow to calculate processing speed and, knowing the original file size, you could then calculate the percentage of progress and even the ETA.