Saturday, November 16, 2013

My experiments with TPL Dataflow

Another WOW technique from the house of Microsoft is the TPL Dataflow for dealing with collections in parallel utilizing the multiple CPU's available. I was doing some experiments with it recently and the code is below;

I implemented the below process using TPL;


class Program
    {
        static Employer employer = new Employer();
        static EmployeeCollection employees = new EmployeeCollection();
        static int i = 0;

        static void Main(string[] args)
        {
            LoadEmployees();
            PayEmployees();
        }

        static void LoadEmployees()
        {
            string fileName = @"C:\Users\m1003014\Desktop\employees.txt";
            var inputBlock = new BufferBlock();
            var processBlock = new TransformBlock(
                x =>
                {
                    Employee emp = null;
                    string[] values = x.Split(@";".ToCharArray());
                    BankAccount bankAccount = new BankAccount(Convert.ToDecimal(values[6]));
                    emp = new Employee(i++, values[0], values[1], Convert.ToDecimal(values[2]), bankAccount);
                    if (Convert.ToDecimal(values[3]) > 0)
                    {
                        emp.TakeAdvance(Convert.ToDecimal(values[3]), Convert.ToDecimal(values[4]), Convert.ToDecimal(values[5]));
                    }
                    return emp;
                });
            var outputBlock = new ActionBlock(
                x =>
                {
                    employees.Add(x);
                });

            using (inputBlock.LinkTo(processBlock))
            using (processBlock.LinkTo(outputBlock))
            {
                inputBlock.Completion.ContinueWith(t => processBlock.Complete());
                processBlock.Completion.ContinueWith(t => outputBlock.Complete());

                if (File.Exists(fileName))
                {
                    StreamReader sr = new StreamReader(fileName);
                    sr.ReadLine();
                    String line = sr.ReadLine();

                    do
                    {
                        inputBlock.Post(line);
                        line = sr.ReadLine();
                    } while (line != null);
                }
                inputBlock.Complete();
                outputBlock.Completion.Wait();
            }
        }

        static void PayEmployees()
        {
            var inputBlock = new BufferBlock();
            var processBlock = new TransformBlock(
                x =>
                {
                    employer.PaySalary(x);
                    return x;
                });
            var outputBlock = new ActionBlock(
                x =>
                {
                    Console.WriteLine(string.Format("The salary of '{1}' INR has been paid to '{0}'", x.FirstName, x.SalaryTobePaid));
                });
            using (inputBlock.LinkTo(processBlock))
            using (processBlock.LinkTo(outputBlock))
            {
                inputBlock.Completion.ContinueWith(t => processBlock.Complete());
                processBlock.Completion.ContinueWith(t => outputBlock.Complete());

                foreach (Employee emp in employees)
                {
                    inputBlock.Post(emp);
                }

                inputBlock.Complete();
                outputBlock.Completion.Wait();
            }
        }
    }

 public class Employee
    {
        int _id;
        string _firstName;
        string _lastName;
        decimal _salary;
        decimal _advanceTaken;
        decimal _monthlyDeduction;
        decimal _tenure;
        decimal _pendingTenure;
        decimal _advanceLeft;
        BankAccount _employeeBankAccount;

        public Employee(int id, string firstName, string lastName, decimal salary, BankAccount employeeBankAccount)
        {
            _id = id;
            _firstName = firstName;
            _lastName = lastName;
            _salary = salary;
            _employeeBankAccount = employeeBankAccount;
        }

        public int EmployeeID
        {
            get { return _id; }
        }

        public string FirstName
        {
            get { return _firstName; }
        }

        public string LastName
        {
            get { return _firstName; }
        }

        public decimal Salary
        {
            get { return _salary; }
        }

        public decimal AdvanceTaken
        {
            get { return _advanceTaken; }
        }

        public decimal AdvanceLeft
        {
            get { return _advanceLeft; }
        }
        public decimal SalaryTobePaid
        {
            get { return _salary - _monthlyDeduction; }
        }

        public BankAccount EmployeeBankAccount
        {
            get { return _employeeBankAccount; }
        }

        public void TakeAdvance(decimal amount, decimal monthlyDeduction, decimal tenure)
        {
            _advanceTaken = amount;
            _advanceLeft = amount;
            _monthlyDeduction = monthlyDeduction;
            _tenure = tenure;
            _pendingTenure = tenure;
        }

        public void UpdateAdvance()
        {
            _advanceLeft -= _monthlyDeduction;
            _pendingTenure -= 1;
        }

        public void CreditSalary()
        {
            _employeeBankAccount.Credit(SalaryTobePaid);
        }

        public bool HasTakenAdvance()
        {
            if (_pendingTenure > 0)
                return true;
            else
                return false;
        }
    }

    public class BankAccount
    {
        decimal _balance;

        public BankAccount(decimal balance)
        {
            _balance = balance;
        }

        public decimal Balance
        {
            get { return _balance; }
        }

        public void Credit(decimal amount)
        {
            _balance += amount;
        }

        public void Debit(decimal amount)
        {
            if (_balance - amount < 0)
                throw new Exception("Insufficient fund available");
            _balance -= amount;
        }
    }

    public class EmployeeCollection : KeyedCollection
    {
        public EmployeeCollection()
            : base()
        {
        }

        protected override int GetKeyForItem(Employee emp)
        {
            return emp.EmployeeID;
        }
    }

    public class MonthlySummary
    {
        int _totalEmployeesReceivingSalary;
        decimal _totalMonthlySalary;
        decimal _totalMonthlyRecovery;

        public int TotalEmployeesReceivingSalary
        {
            get { return _totalEmployeesReceivingSalary; }
        }

        public decimal TotalMonthlySalary
        {
            get { return _totalMonthlySalary; }
        }

        public decimal TotalMonthlyRecovery
        {
            get { return _totalMonthlyRecovery; }
        }

        public void UpdateThisEmployeeDetails(Employee emp)
        {
            _totalEmployeesReceivingSalary += 1;
            _totalMonthlySalary += emp.SalaryTobePaid;
            _totalMonthlyRecovery += emp.SalaryTobePaid - emp.Salary;
        }
    }

    public class Employer
    {
        BankAccount _employerBankAccount;
        MonthlySummary _monthlySummary;

        public Employer()
        {
            _employerBankAccount = new BankAccount(100000000);
            _monthlySummary = new MonthlySummary();
        }

        public void PaySalary(Employee emp)
        {
            _employerBankAccount.Debit(emp.SalaryTobePaid);
            emp.CreditSalary();
            _monthlySummary.UpdateThisEmployeeDetails(emp);
        }
    }

Sample input file;
FirstName;LastName;Salary;AdvanceTaken;MonthlyDeduction;Tenure;AccountBalance
Raja;vel;10000;0;0;0;200000
Bal;ji;10000;0;0;0;100000
test;test;10000;100000;5000;20;10000

No comments:

Post a Comment