001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage;
020import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame;
021
022import java.io.IOException;
023import java.io.Serializable;
024import java.io.StringReader;
025import java.io.StringWriter;
026import java.util.HashMap;
027import java.util.Locale;
028import java.util.Map;
029
030import javax.jms.JMSException;
031
032import org.apache.activemq.advisory.AdvisorySupport;
033import org.apache.activemq.broker.BrokerContext;
034import org.apache.activemq.broker.BrokerContextAware;
035import org.apache.activemq.command.ActiveMQMapMessage;
036import org.apache.activemq.command.ActiveMQMessage;
037import org.apache.activemq.command.ActiveMQObjectMessage;
038import org.apache.activemq.command.DataStructure;
039import org.apache.activemq.transport.stomp.Stomp.Headers;
040import org.apache.activemq.transport.stomp.Stomp.Responses;
041import org.apache.activemq.transport.stomp.Stomp.Transformations;
042import org.codehaus.jettison.mapped.Configuration;
043import org.fusesource.hawtbuf.UTF8Buffer;
044
045import com.thoughtworks.xstream.XStream;
046import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
047import com.thoughtworks.xstream.io.HierarchicalStreamReader;
048import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
049import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
050import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
051import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
052import com.thoughtworks.xstream.io.xml.XppReader;
053import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
054
055/**
056 * Frame translator implementation that uses XStream to convert messages to and
057 * from XML and JSON
058 */
059public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware {
060
061    XStream xStream = null;
062    BrokerContext brokerContext;
063
064    @Override
065    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
066        Map<String, String> headers = command.getHeaders();
067        ActiveMQMessage msg;
068        String transformation = headers.get(Headers.TRANSFORMATION);
069        if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) {
070            msg = super.convertFrame(converter, command);
071        } else {
072            HierarchicalStreamReader in;
073
074            try {
075                String text = new String(command.getContent(), "UTF-8");
076                switch (Transformations.getValue(transformation)) {
077                    case JMS_OBJECT_XML:
078                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
079                        msg = createObjectMessage(in);
080                        break;
081                    case JMS_OBJECT_JSON:
082                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
083                        msg = createObjectMessage(in);
084                        break;
085                    case JMS_MAP_XML:
086                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
087                        msg = createMapMessage(in);
088                        break;
089                    case JMS_MAP_JSON:
090                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
091                        msg = createMapMessage(in);
092                        break;
093                    default:
094                        throw new Exception("Unknown transformation: " + transformation);
095                }
096            } catch (Throwable e) {
097                command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage());
098                msg = super.convertFrame(converter, command);
099            }
100        }
101
102        copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
103        return msg;
104    }
105
106    @Override
107    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
108
109        StompFrame command = new StompFrame();
110        command.setAction(Responses.MESSAGE);
111        Map<String, String> headers = new HashMap<String, String>(25);
112        command.setHeaders(headers);
113
114        copyStandardHeadersFromMessageToFrame(converter, message, command, this);
115
116        String transformation = headers.get(Headers.TRANSFORMATION);
117
118        if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
119
120            if (Transformations.JMS_XML.equals(transformation)) {
121                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
122            } else if (Transformations.JMS_JSON.equals(transformation)) {
123                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString());
124            }
125
126            if (!headers.containsKey(Headers.TRANSFORMATION)) {
127                headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
128            }
129
130            ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
131            command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
132
133        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
134
135            if (Transformations.JMS_XML.equals(transformation)) {
136                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
137            } else if (Transformations.JMS_JSON.equals(transformation)) {
138                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString());
139            }
140
141            if (!headers.containsKey(Headers.TRANSFORMATION)) {
142                headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
143            }
144
145            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
146            command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
147
148        } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
149
150            if (Transformations.JMS_XML.equals(transformation)) {
151                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString());
152            } else if (Transformations.JMS_JSON.equals(transformation)) {
153                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
154            }
155
156            if (!headers.containsKey(Headers.TRANSFORMATION)) {
157                headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
158            }
159
160            String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION));
161            command.setContent(body.getBytes("UTF-8"));
162
163        } else {
164            command = super.convertMessage(converter, message);
165        }
166
167        return command;
168    }
169
170    /**
171     * Marshal the Object to a string using XML or JSON encoding
172     *
173     * @param object
174     *        the object to marshal
175     * @param transformation
176     *        the transformation to apply to the object.
177     *
178     * @returns the marshaled form of the given object, in JSON or XML.
179     *
180     * @throws JMSException if an error occurs during the marshal operation.
181     */
182    protected String marshall(Serializable object, String transformation) throws JMSException {
183        StringWriter buffer = new StringWriter();
184        HierarchicalStreamWriter out;
185        if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
186            out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
187        } else {
188            out = new PrettyPrintWriter(buffer);
189        }
190        getXStream().marshal(object, out);
191        return buffer.toString();
192    }
193
194    protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
195        ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
196        Object obj = getXStream().unmarshal(in);
197        objMsg.setObject((Serializable) obj);
198        return objMsg;
199    }
200
201    @SuppressWarnings("unchecked")
202    protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
203        ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
204        Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in);
205        for (String key : map.keySet()) {
206            mapMsg.setObject(key, map.get(key));
207        }
208        return mapMsg;
209    }
210
211    protected String marshallAdvisory(final DataStructure ds, String transformation) {
212
213        StringWriter buffer = new StringWriter();
214        HierarchicalStreamWriter out;
215        if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
216            out = new JettisonMappedXmlDriver().createWriter(buffer);
217        } else {
218            out = new PrettyPrintWriter(buffer);
219        }
220
221        XStream xstream = getXStream();
222        xstream.setMode(XStream.NO_REFERENCES);
223        xstream.aliasPackage("", "org.apache.activemq.command");
224        xstream.marshal(ds, out);
225        return buffer.toString();
226    }
227
228    // Properties
229    // -------------------------------------------------------------------------
230    public XStream getXStream() {
231        if (xStream == null) {
232            xStream = createXStream();
233        }
234        return xStream;
235    }
236
237    public void setXStream(XStream xStream) {
238        this.xStream = xStream;
239    }
240
241    // Implementation methods
242    // -------------------------------------------------------------------------
243    @SuppressWarnings("unchecked")
244    protected XStream createXStream() {
245        XStream xstream = null;
246        if (brokerContext != null) {
247            Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
248            for (XStream bean : beans.values()) {
249                if (bean != null) {
250                    xstream = bean;
251                    break;
252                }
253            }
254        }
255
256        if (xstream == null) {
257            xstream = XStreamSupport.createXStream();
258            xstream.ignoreUnknownElements();
259        }
260
261        // For any object whose elements contains an UTF8Buffer instance instead
262        // of a String type we map it to String both in and out such that we don't
263        // marshal UTF8Buffers out
264        xstream.registerConverter(new AbstractSingleValueConverter() {
265
266            @Override
267            public Object fromString(String str) {
268                return str;
269            }
270
271            @SuppressWarnings("rawtypes")
272            @Override
273            public boolean canConvert(Class type) {
274                return type.equals(UTF8Buffer.class);
275            }
276        });
277
278        xstream.alias("string", UTF8Buffer.class);
279
280        return xstream;
281    }
282
283    @Override
284    public void setBrokerContext(BrokerContext brokerContext) {
285        this.brokerContext = brokerContext;
286    }
287
288    @Override
289    public BrokerContext getBrokerContext() {
290        return this.brokerContext;
291    }
292
293    /**
294     * Return an Advisory message as a JSON formatted string
295     *
296     * @param ds
297     *        the DataStructure instance that is being marshaled.
298     *
299     * @return the JSON marshaled form of the given DataStructure instance.
300     */
301    protected String marshallAdvisory(final DataStructure ds) {
302        XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
303        xstream.setMode(XStream.NO_REFERENCES);
304        xstream.aliasPackage("", "org.apache.activemq.command");
305        return xstream.toXML(ds);
306    }
307}