001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadaptor;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.activemq.command.BaseCommand;
026import org.apache.activemq.kaha.Marshaller;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.wireformat.WireFormat;
029
030/**
031 * Marshall a Transaction
032 * 
033 * 
034 */
035public class TransactionMarshaller implements Marshaller {
036
037    private WireFormat wireFormat;
038
039    public TransactionMarshaller(WireFormat wireFormat) {
040        this.wireFormat = wireFormat;
041
042    }
043
044    public void writePayload(Object object, DataOutput dataOut) throws IOException {
045        KahaTransaction kt = (KahaTransaction)object;
046        List list = kt.getList();
047        dataOut.writeInt(list.size());
048        for (int i = 0; i < list.size(); i++) {
049            TxCommand tx = (TxCommand)list.get(i);
050            Object key = tx.getMessageStoreKey();
051            ByteSequence packet = wireFormat.marshal(key);
052            dataOut.writeInt(packet.length);
053            dataOut.write(packet.data, packet.offset, packet.length);
054            Object command = tx.getCommand();
055            packet = wireFormat.marshal(command);
056            dataOut.writeInt(packet.length);
057            dataOut.write(packet.data, packet.offset, packet.length);
058
059        }
060    }
061
062    public Object readPayload(DataInput dataIn) throws IOException {
063        KahaTransaction result = new KahaTransaction();
064        List list = new ArrayList();
065        result.setList(list);
066        int number = dataIn.readInt();
067        for (int i = 0; i < number; i++) {
068            TxCommand command = new TxCommand();
069            int size = dataIn.readInt();
070            byte[] data = new byte[size];
071            dataIn.readFully(data);
072            Object key = wireFormat.unmarshal(new ByteSequence(data));
073            command.setMessageStoreKey(key);
074            size = dataIn.readInt();
075            data = new byte[size];
076            dataIn.readFully(data);
077            BaseCommand bc = (BaseCommand)wireFormat.unmarshal(new ByteSequence(data));
078            command.setCommand(bc);
079            list.add(command);
080        }
081        return result;
082
083    }
084}