<?xml version="1.0" encoding="UTF-8"?><rss version="2.0"
	xmlns:content="http://purl.org/rss/1.0/modules/content/"
	xmlns:wfw="http://wellformedweb.org/CommentAPI/"
	xmlns:dc="http://purl.org/dc/elements/1.1/"
	xmlns:atom="http://www.w3.org/2005/Atom"
	xmlns:sy="http://purl.org/rss/1.0/modules/syndication/"
	xmlns:slash="http://purl.org/rss/1.0/modules/slash/"
	>

<channel>
	<title>Marçal Serrate</title>
	<atom:link href="http://www.serrate.net/feed/" rel="self" type="application/rss+xml" />
	<link>http://www.serrate.net</link>
	<description>denormalized and distributed by design</description>
	<lastBuildDate>Tue, 15 Mar 2016 08:04:07 +0000</lastBuildDate>
	<language>en-US</language>
	<sy:updatePeriod>hourly</sy:updatePeriod>
	<sy:updateFrequency>1</sy:updateFrequency>
	<!--built on the Whiteboard Framework-->
	<item>
		<title>Preview of Kafka Streams</title>
		<link>http://www.serrate.net/2016/03/15/preview-of-kafka-streams/</link>
		<comments>http://www.serrate.net/2016/03/15/preview-of-kafka-streams/#respond</comments>
		<pubDate>Tue, 15 Mar 2016 01:02:29 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Blog]]></category>
		<category><![CDATA[Featured]]></category>
		<category><![CDATA[BigData]]></category>
		<category><![CDATA[Kafka]]></category>
		<category><![CDATA[Storm]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=335</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>The preview of <strong>Kafka Streams</strong>, which is one of the main features of the upcoming <strong>Apache Kafka 0.10</strong>, was <a href="http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple" target="_blank">announced by Jay Kreps this week</a>.</p>
<h3>Kafka joins the Stream Processing club</h3>
<p>Kafka Streams is a library to build streaming applications using Kafka topics as input/output. Kafka Streams is in the same league as other streaming systems such as: <strong>Apache Storm</strong>, <strong>Apache Flink</strong> and, not surprisingly,<strong> Apache Samza</strong> which also uses Kafka for input or output of streams.</p>
<p>One of the main advantages is that if you&#8217;re already using Apache Kafka and you need real-time processing, you just need to use the library and you are good to go.<br />
Other important features are: <strong>stateful</strong> processing, <strong>windowing</strong> and ability to be deployed using your preferred solution: a simple command line, <strong>Mesos</strong>, <strong>YARN</strong> or <strong>kubernetes</strong> and <strong>docker</strong> if you&#8217;re a container party boy.</p>
<h3>Streams and Tables</h3>
<p>One of the key concepts in Kafka Streams is the support of <strong>KStream</strong> and <strong>KTable</strong>.<br />
That isn&#8217;t a new concept if you come from the <strong><a href="http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/" target="_blank">Event Sourcing</a></strong> world: the KStream is the append-only event store where its state is given by replaying the events from the beginning of time until the last event whereas KTable is the snapshot or projection of the current state of the stream given a point in time.</p>
<h3>Example: Twitter Hashtags Job</h3>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/03/KafkaStreams-2.png" rel="attachment wp-att-352"><img class="wp-image-352 aligncenter" src="http://www.serrate.net/files/2016/03/KafkaStreams-2-1024x411.png" alt="KafkaStreams" width="749" height="303" /></a></p>
<p class="wp-caption-text" style="text-align: center;">Kafka Streams: KStream and KTable</p>
<p>&nbsp;</p>
<h5>Show me the code!</h5>
<p>You can find the complete example here: <a href="https://github.com/mserrate/kafka-streams-app" target="_blank">https://github.com/mserrate/kafka-streams-app</a></p>
<pre class="brush: java; title: ; notranslate">
KStream&lt;String, JsonNode&gt; source = builder.stream(stringDeserializer, jsonDeserializer, &quot;streams-hashtag-input&quot;);

KTable&lt;String, Long&gt; counts = source
        .filter(new HashtagFilter())
        .flatMapValues(new HashtagSplitter())
        .map(new HashtagMapper())
        .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, &quot;Counts&quot;);

counts.to(&quot;streams-hashtag-count-output&quot;, stringSerializer, longSerializer);
</pre>
<p>For this example I&#8217;ve been using a simple TweetProducer who connects to the <strong>Twitter Streaming API</strong> and sends JSON events to a Kafka topic.<br />
This topic is read as a KStream and then we begin the process:</p>
<ol>
<li>Filter out the tweets without hashtags</li>
<li>Apply a flatMapValues (we are just interested in the values, not the keys) to split the different hashtags in a tweet</li>
<li>Apply a map to return a key (hashtag) value (hashtag) as we want to aggregate by hashtag</li>
<li>Aggregate the streams per key (the hashtag) and count them</li>
</ol>
<p>&nbsp;</p>
<p>Finally we send the KTable to the output queue.</p>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2016/03/15/preview-of-kafka-streams/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Sentiment analysis of tweets</title>
		<link>http://www.serrate.net/2016/02/07/sentiment-analysis-of-tweets/</link>
		<comments>http://www.serrate.net/2016/02/07/sentiment-analysis-of-tweets/#respond</comments>
		<pubDate>Sun, 07 Feb 2016 15:40:34 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Blog]]></category>
		<category><![CDATA[BigData]]></category>
		<category><![CDATA[Storm]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=294</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>In the previous <a href="http://www.serrate.net/2016/01/05/analysis-of-twitter-streams-with-kafka-and-storm/" target="_blank">post</a> I have presented an overview of the topology used to analyse twitter streams with <strong>Kafka</strong> and <strong>Storm</strong>. Now it&#8217;s time to cover the technical details of the twitter topology.</p>
<h3>Twitter Topology</h3>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/02/TwitterTopology_Analysis.png" rel="attachment wp-att-297"><img class="wp-image-309 aligncenter" src="http://www.serrate.net/files/2016/02/TwitterTopology_Analysis.png" alt="" width="550" height="254" /></a></p>
<p>The declaration of the storm topology using KafkaSpout to read the tweets from a kafka queue:</p>
<pre class="brush: java; title: ; notranslate">
public class TwitterProcessorTopology extends BaseTopology {

    public TwitterProcessorTopology(String configFileLocation) throws Exception {
        super(configFileLocation);
    }

    private void configureKafkaSpout(TopologyBuilder topology) {
        BrokerHosts hosts = new ZkHosts(topologyConfig.getProperty(&quot;zookeeper.host&quot;));

        SpoutConfig spoutConfig = new SpoutConfig(
                hosts,
                topologyConfig.getProperty(&quot;kafka.twitter.raw.topic&quot;),
                topologyConfig.getProperty(&quot;kafka.zkRoot&quot;),
                topologyConfig.getProperty(&quot;kafka.consumer.group&quot;));
        spoutConfig.scheme= new SchemeAsMultiScheme(new StringScheme());

        KafkaSpout kafkaSpout= new KafkaSpout(spoutConfig);
        topology.setSpout(&quot;twitterSpout&quot;, kafkaSpout);
    }

    private void configureBolts(TopologyBuilder topology) {
        // filtering
        topology.setBolt(&quot;twitterFilter&quot;, new TwitterFilterBolt(), 4)
                .shuffleGrouping(&quot;twitterSpout&quot;);

        // sanitization
        topology.setBolt(&quot;textSanitization&quot;, new TextSanitizationBolt(), 4)
                .shuffleGrouping(&quot;twitterFilter&quot;);

        // sentiment analysis
        topology.setBolt(&quot;sentimentAnalysis&quot;, new SentimentAnalysisBolt(), 4)
                .shuffleGrouping(&quot;textSanitization&quot;);

        // persist tweets with analysis to Cassandra
        topology.setBolt(&quot;sentimentAnalysisToCassandra&quot;, new SentimentAnalysisToCassandraBolt(topologyConfig), 4)
                .shuffleGrouping(&quot;sentimentAnalysis&quot;);

        // divide sentiment by hashtag
        topology.setBolt(&quot;hashtagSplitter&quot;, new HashtagSplitterBolt(), 4)
                .shuffleGrouping(&quot;textSanitization&quot;);

        // persist hashtags to Cassandra
        topology.setBolt(&quot;hashtagCounter&quot;, new HashtagCounterBolt(), 4)
                .fieldsGrouping(&quot;hashtagSplitter&quot;, new Fields(&quot;tweet_hashtag&quot;));

        topology.setBolt(&quot;topHashtag&quot;, new TopHashtagBolt())
                .globalGrouping(&quot;hashtagCounter&quot;);

        topology.setBolt(&quot;topHashtagToCassandra&quot;, new TopHashtagToCassandraBolt(topologyConfig), 4)
                .shuffleGrouping(&quot;topHashtag&quot;);
    }

    private void buildAndSubmit() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        configureKafkaSpout(builder);
        configureBolts(builder);

        Config config = new Config();

        //set producer properties
        Properties props = new Properties();
        props.put(&quot;metadata.broker.list&quot;, topologyConfig.getProperty(&quot;kafka.broker.list&quot;));
        props.put(&quot;request.required.acks&quot;, &quot;1&quot;);
        props.put(&quot;serializer.class&quot;, &quot;kafka.serializer.StringEncoder&quot;);
        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology(&quot;twitter-processor&quot;, config, builder.createTopology());
    }

    public static void main(String[] args) throws Exception {
        String configFileLocation = args[0];

        TwitterProcessorTopology topology = new TwitterProcessorTopology(configFileLocation);
        topology.buildAndSubmit();
    }
}

</pre>
<p><span id="more-294"></span></p>
<h4>1. Filter Bolt</h4>
<p>First of all, we are going to filter the tweets that we are interested in. As we are going to perform the sentiment analysis just to tweets in english, we are filtering on this property:</p>
<pre class="brush: java; title: ; notranslate"> 
public class TwitterFilterBolt extends BaseBasicBolt {
    private static final Logger LOG = LoggerFactory.getLogger(TwitterFilterBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        try {
            JSONObject object = (JSONObject)JSONValue.parseWithException(tuple.getString(0));

            if (object.containsKey(&quot;lang&quot;) &amp;amp;amp;&amp;amp;amp; &quot;en&quot;.equals(object.get(&quot;lang&quot;))) {
                long id = (long)object.get(&quot;id&quot;);
                String text = (String)object.get(&quot;text&quot;);
                String createdAt = (String)object.get(&quot;created_at&quot;);
                JSONObject entities= (JSONObject)object.get(&quot;entities&quot;);
                JSONArray hashtags =(JSONArray)entities.get(&quot;hashtags&quot;);
                HashSet&amp;amp;lt;String&amp;amp;gt; hashtagList = new HashSet&amp;amp;lt;String&amp;amp;gt;();
                for(Object hashtag : hashtags)
                {
                    hashtagList.add(((String)((JSONObject)hashtag).get(&quot;text&quot;)).toLowerCase());
                }

                collector.emit(new Values(id, text, hashtagList, createdAt));
            }
            else {
                LOG.debug(&quot;Ignoring non-english tweets&quot;);
            }

        } catch (ParseException e) {
            LOG.error(&quot;Error parsing tweet: &quot; + e.getMessage());
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;tweet_id&quot;, &quot;tweet_text&quot;, &quot;tweet_hashtags&quot;, &quot;tweet_created_at&quot;));
    }
}
</pre>
<h4>2. Sanitization Bolt</h4>
<p>Then, we will sanitise our tweets by converting accented characters into unaccented characters and by removing single letters or numbers:</p>
<pre class="brush: java; title: ; notranslate">
public class TextSanitizationBolt extends BaseBasicBolt {
    private static final Logger LOG = LoggerFactory.getLogger(TextSanitizationBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {

        String text = tuple.getString(1);
        String normalizedText = Normalizer.normalize(text, Normalizer.Form.NFD);
        text = normalizedText.replaceAll(&quot;\\p{InCombiningDiacriticalMarks}+&quot;, &quot;&quot;);
        text = text.replaceAll(&quot;[^\\p{L}\\p{Nd}]+&quot;, &quot; &quot;).toLowerCase();

        collector.emit(new Values(
                tuple.getLongByField(&quot;tweet_id&quot;),
                text,
                tuple.getValueByField(&quot;tweet_hashtags&quot;),
                tuple.getStringByField(&quot;tweet_created_at&quot;)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;tweet_id&quot;, &quot;tweet_text&quot;, &quot;tweet_hashtags&quot;, &quot;tweet_created_at&quot;));
    }
}
</pre>
<h4>3a. Sentiment Analysis Bolt</h4>
<p>On this bolt we are scoring the tweet by each of its words using <a href="http://sentiwordnet.isti.cnr.it/" target="_blank">SentiWordNet</a>. That&#8217;s not the best way to do it as it can have false positives or negatives given that it does the classification word by word independently: it does not cover the tweet context or sarcasm, etc. but that&#8217;s ok for a sample <img src="https://s.w.org/images/core/emoji/2.3/72x72/1f642.png" alt="🙂" class="wp-smiley" style="height: 1em; max-height: 1em;" /></p>
<pre class="brush: java; title: ; notranslate">
public class SentimentAnalysisBolt extends BaseBasicBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SentimentAnalysisBolt.class);
    SentiWordNet sentiWordNet;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try {
            sentiWordNet = SentiWordNet.getInstance();
        } catch (IOException e) {
            LOG.error(&quot;Problem parsing SentiWordNet file: &quot; + e.getMessage());
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        double count = 0;
        String text = tuple.getStringByField(&quot;tweet_text&quot;);

        try {
            String delimiters = &quot;\\W&quot;;
            String[] tokens = text.split(delimiters);
            double feeling = 0;
            for (int i = 0; i &amp;amp;lt; tokens.length; ++i) {
                if (!tokens[i].isEmpty()) {
                    // Search as adjective
                    feeling = sentiWordNet.extract(tokens[i], &quot;a&quot;);
                    count += feeling;
                }
            }

            LOG.info(&quot;text: &quot; + text + &quot; count: &quot; + count);
        }
        catch (Exception e) {
            LOG.error(&quot;Problem found when classifying the text: &quot; + e.getMessage());
        }

        collector.emit(new Values(
                tuple.getLongByField(&quot;tweet_id&quot;),
                text,
                count,
                tuple.getValueByField(&quot;tweet_hashtags&quot;),
                tuple.getStringByField(&quot;tweet_created_at&quot;)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;tweet_id&quot;, &quot;tweet_text&quot;, &quot;tweet_sentiment&quot;, &quot;tweet_hashtags&quot;, &quot;tweet_created_at&quot;));
    }
}
</pre>
<h4>4a. Save results to Cassandra</h4>
<p>Finally, we will store to Cassandra the the tweet with the score and its hashtags if any:</p>
<pre class="brush: java; title: ; notranslate">
public class SentimentAnalysisToCassandraBolt extends CassandraBaseBolt {
    private static final Logger LOG = LoggerFactory.getLogger(SentimentAnalysisToCassandraBolt.class);

    public SentimentAnalysisToCassandraBolt(Properties properties) {
        super(properties);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {

        HashSet&amp;lt;String&amp;gt; hashtags = (HashSet&amp;lt;String&amp;gt;)tuple.getValueByField(&quot;tweet_hashtags&quot;);

        Statement statement = QueryBuilder.update(&quot;tweet_sentiment_analysis&quot;)
                .with(QueryBuilder.set(&quot;tweet&quot;, tuple.getStringByField(&quot;tweet_text&quot;)))
                .and(QueryBuilder.set(&quot;sentiment&quot;, tuple.getDoubleByField(&quot;tweet_sentiment&quot;)))
                .and(QueryBuilder.addAll(&quot;hashtags&quot;, hashtags))
                .and(QueryBuilder.set(&quot;created_at&quot;, tuple.getStringByField(&quot;tweet_created_at&quot;)))
                .where(QueryBuilder.eq(&quot;tweet_id&quot;, tuple.getLongByField(&quot;tweet_id&quot;)));

        LOG.debug(statement.toString());

        session.execute(statement);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}
</pre>
<h4>3b. Hashtag Splitter Bolt</h4>
<p>That branch of the topology graph is responsible for splitting the different hashtags and emitting a tuple per hashtag to the next bolt. That&#8217;s why we inherit from BaseRichBolt in order to manually ACK the tuple after all hashtags have been emitted.</p>
<pre class="brush: java; title: ; notranslate">
public class HashtagSplitterBolt extends BaseRichBolt {
    OutputCollector collector;
    Map&amp;lt;String, Integer&amp;gt; count = new HashMap&amp;lt;String, Integer&amp;gt;();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        HashSet&amp;lt;String&amp;gt; hashtags = (HashSet&amp;lt;String&amp;gt;)tuple.getValueByField(&quot;tweet_hashtags&quot;);

        for (String hashtag : hashtags) {
            collector.emit(new Values(hashtag));
        }

        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;tweet_hashtag&quot;));
    }
}
</pre>
<h4>4b. Hashtag Counter Bolt</h4>
<p>In this particular case, we are using <strong>Fields Grouping</strong> because we want to partition the stream by hashtag. It means that the same hashtag will always go to the same task. Thus, we can use a hashmap to count the number of occurrences of a hashtag:</p>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/02/HashtagSplit.png" rel="attachment wp-att-313"><img class="wp-image-313 aligncenter" src="http://www.serrate.net/files/2016/02/HashtagSplit.png" alt="Hashtag Split" width="180" height="228" srcset="http://www.serrate.net/files/2016/02/HashtagSplit.png 255w, http://www.serrate.net/files/2016/02/HashtagSplit-236x300.png 236w" sizes="(max-width: 180px) 100vw, 180px" /></a></p>
<pre class="brush: java; title: ; notranslate">
public class HashtagCounterBolt extends BaseBasicBolt {
    private static final Logger LOG = LoggerFactory.getLogger(HashtagCounterBolt.class);
    private Map&amp;lt;String, Long&amp;gt; hashtag_count = new HashMap&amp;lt;String, Long&amp;gt;();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String hashtag = tuple.getStringByField(&quot;tweet_hashtag&quot;);
        Long count = hashtag_count.get(hashtag);

        if (count == null)
            count = 0L;

        count++;
        hashtag_count.put(hashtag, count);

        collector.emit(new Values(hashtag, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;hashtag&quot;, &quot;count&quot;));
    }
}
</pre>
<h4>5b.  Top Hashtag Bolt</h4>
<p>For that bolt, we are using <strong>Global Grouping</strong> to get the top 20 hashtags. Global grouping means that all hashtags will go to the same bolt&#8217;s task. For that use case, we need a sliding windows in order to get the top 20 hashtags every 10 seconds. We are relying on the Storm <strong>Tick Tuple</strong> feature. For normal tuples we just do the ranking of hashtags and, when a tick tuple is received (configured to get it every 10sec) we emit the ranking calculated over this window of time.</p>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/02/TopN.png" rel="attachment wp-att-314"><img class="wp-image-314 aligncenter" src="http://www.serrate.net/files/2016/02/TopN.png" alt="Top N" width="209" height="221" /></a></p>
<pre class="brush: java; title: ; notranslate">
public class TopHashtagBolt extends BaseBasicBolt {
    List&amp;lt;List&amp;gt; rankings = new ArrayList&amp;lt;List&amp;gt;();
    private static final Logger LOG = LoggerFactory.getLogger(TopHashtagBolt.class);
    private static final Integer TOPN = 20;
    private static final Integer TICK_FREQUENCY = 10;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        if (isTickTuple(tuple)) {
            LOG.debug(&quot;Tick: &quot; + rankings);
            collector.emit(new Values(new ArrayList(rankings)));
        } else {
            rankHashtag(tuple);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(&quot;tophashtags&quot;));
    }

    @Override
    public Map&amp;lt;String, Object&amp;gt; getComponentConfiguration() {
        Config conf = new Config();
        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, TICK_FREQUENCY);
        return conf;
    }

    private void rankHashtag(Tuple tuple) {
        String hashtag = tuple.getStringByField(&quot;hashtag&quot;);
        Integer existingIndex = find(hashtag);
        if (null != existingIndex)
            rankings.set(existingIndex, tuple.getValues());
        else
            rankings.add(tuple.getValues());

        Collections.sort(rankings, new Comparator&amp;lt;List&amp;gt;() {
            @Override
            public int compare(List o1, List o2) {
                return compareRanking(o1, o2);
            }
        });

        shrinkRanking();
    }

    private Integer find(String hashtag) {
        for(int i = 0; i &amp;lt; rankings.size(); ++i) { String current = (String) rankings.get(i).get(0); if (current.equals(hashtag)) { return i; } } return null; } private int compareRanking(List one, List two) { long valueOne = (Long) one.get(1); long valueTwo = (Long) two.get(1); long delta = valueTwo - valueOne; if(delta &amp;gt; 0) {
            return 1;
        } else if (delta &amp;lt; 0) { return -1; } else { return 0; } } private void shrinkRanking() { int size = rankings.size(); if (TOPN &amp;gt;= size) return;
        for (int i = TOPN; i &amp;lt; size; i++) {
            rankings.remove(rankings.size() - 1);
        }
    }

    private static boolean isTickTuple(Tuple tuple) {
        return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
                &amp;amp;&amp;amp; tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
    }
}
</pre>
<h4>6b. Save results to Cassandra</h4>
<p>Finally, we are storing the top N hashtags per day in Cassandra. For that we&#8217;re using the <strong>row partitioning</strong> pattern to store a row per day and the top hashtags for each time bucket (20 seconds)</p>
<pre class="brush: java; title: ; notranslate">
public class TopHashtagToCassandraBolt extends CassandraBaseBolt {
    private static final Logger LOG = LoggerFactory.getLogger(TopHashtagToCassandraBolt.class);

    public TopHashtagToCassandraBolt(Properties properties) {
        super(properties);
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        List&amp;lt;List&amp;gt; rankings = (List) tuple.getValue(0);

        Map&amp;lt;String, Long&amp;gt; rankingMap = new HashMap&amp;lt;&amp;gt;();

        for (List list : rankings) {
            rankingMap.put((String) list.get(0), (Long) list.get(1));
        }

        DateFormat df = new SimpleDateFormat(&quot;yyyy-MM-dd&quot;);

        Statement statement = QueryBuilder.insertInto(&quot;top_hashtag_by_day&quot;)
                .value(&quot;date&quot;, df.format(new Date()))
                .value(&quot;bucket_time&quot;, QueryBuilder.raw(&quot;dateof(now())&quot;))
                .value(&quot;ranking&quot;, rankingMap);

        LOG.debug(statement.toString());

        session.execute(statement);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
</pre>
<h3>Next steps</h3>
<p>Although the hashtag counter may work, I will not say that is entirely correct and there are better ways to do it:</p>
<ul>
<li>You can take a look to this excellent resource: <a href="http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/" target="_blank">http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/</a></li>
<li>Using <strong>Storm Trident</strong>: In the next post I will show how to use the high level abstraction from Storm that allows to process a stream as a sequence of small batches of data (aka <strong>micro-batching</strong>) and fits better for the top hashtags example.</li>
</ul>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2016/02/07/sentiment-analysis-of-tweets/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Analysis of twitter streams with Kafka and Storm</title>
		<link>http://www.serrate.net/2016/01/05/analysis-of-twitter-streams-with-kafka-and-storm/</link>
		<comments>http://www.serrate.net/2016/01/05/analysis-of-twitter-streams-with-kafka-and-storm/#respond</comments>
		<pubDate>Tue, 05 Jan 2016 01:24:07 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Featured]]></category>
		<category><![CDATA[BigData]]></category>
		<category><![CDATA[Cassandra]]></category>
		<category><![CDATA[Kafka]]></category>
		<category><![CDATA[Storm]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=248</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>Following my last <a href="http://www.serrate.net/2015/12/13/big-data-streams-and-lambdas/">post</a>, I will present a real-time processing sample with Kafka and Storm using the Twitter Streaming API.</p>
<h2>Overview</h2>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/02/TwitterStreaming.png" rel="attachment wp-att-296"><img class="wp-image-296 aligncenter" src="http://www.serrate.net/files/2016/02/TwitterStreaming.png" alt="Twitter Streaming" width="524" height="247" /></a></p>
<p>The solution consists of the following:</p>
<ul>
<li><a href="https://github.com/mserrate/twitter-streaming-app/tree/master/twitter-kafka-producer" target="_blank">twitter-kafka-producer</a>: A very basic producer that reads tweets from the Twitter Streaming API and stores them in Kafka.
<div style="margin-bottom: 4px;"></div>
</li>
<li><a href="https://github.com/mserrate/twitter-streaming-app/tree/master/twitter-storm-topology" target="_blank">twitter-storm-topology</a>: A Storm topology that reads tweets from Kafka and, after applying filtering and sanitization, process the messages in parallel for:
<ul>
<li><strong>Sentiment Analysis:</strong> Using a sentiment analysis algorithm to classify the tweet into a positive or negative feeling.</li>
<li><strong>Top Hashtags:</strong> Calculates the top 20 hashtags using a sliding window.</li>
</ul>
</li>
</ul>
<p>&nbsp;</p>
<h3>Storm Topology</h3>
<p style="text-align: center;"><a href="http://www.serrate.net/files/2016/02/TwitterTopology.png" rel="attachment wp-att-297"><img class=" wp-image-297 aligncenter" src="http://www.serrate.net/files/2016/02/TwitterTopology.png" alt="Twitter Topology" width="577" height="180" /></a></p>
<p>The Storm topology consist of the following elements:</p>
<ul>
<li><strong><a href="https://github.com/apache/storm/tree/master/external/storm-kafka" target="_blank">Kafka Spout</a>:</strong> The spout implementation to read messages from Kafka.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/TwitterFilterBolt.java" target="_blank">Filtering</a>:</strong> Filtering out all non-english language tweets.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/TextSanitizationBolt.java" target="_blank">Sanitization</a>:</strong> Text normalization in order to be processed properly by the sentiment analysis algorithm.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/SentimentAnalysisBolt.java" target="_blank">Sentiment Analysis</a>:</strong> The algorithm that analyses word by word the text of the tweet, giving a value between -1 to 1.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/SentimentAnalysisToCassandraBolt.java" target="_blank">Sentiment Analysis to Cassandra</a>:</strong> Stores the tweets and its sentiment value in Cassandra.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/HashtagSplitterBolt.java" target="_blank">Hashtag Splitter</a>:</strong> Splits the different hashtags appearing in a tweet.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/HashtagCounterBolt.java" target="_blank">Hashtag Counter</a>:</strong> Counts hashtag occurrences.
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/TopHashtagBolt.java" target="_blank">Top Hashtag</a>:</strong> Does a ranking of the top 20 hashtags given a sliding windows (using the <em>Tick Tuple</em> feature from Storm).
<div style="margin-bottom: 4px;"></div>
</li>
<li><strong><a href="https://github.com/mserrate/twitter-streaming-app/blob/master/twitter-storm-topology/src/main/java/bolts/TopHashtagToCassandraBolt.java" target="_blank">Top Hashtag to Cassandra</a>:</strong> Stores the top 20 hashtags in Cassandra.</li>
</ul>
<p>&nbsp;</p>
<h3>Summary</h3>
<p>In this post we have seen the benefits of using <strong>Apache Kafka</strong> &amp; <strong>Apache Storm</strong> to ingest and process streams of data, on next posts will look at the implementation details and will provide some analytical insight from the data stored in <strong>Cassandra</strong>.</p>
<p>The sample can be found on Github: <a href="https://github.com/mserrate/twitter-streaming-app">https://github.com/mserrate/twitter-streaming-app</a>,</p>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2016/01/05/analysis-of-twitter-streams-with-kafka-and-storm/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Big Data: streams and lambdas</title>
		<link>http://www.serrate.net/2015/12/13/big-data-streams-and-lambdas/</link>
		<comments>http://www.serrate.net/2015/12/13/big-data-streams-and-lambdas/#respond</comments>
		<pubDate>Sun, 13 Dec 2015 00:31:08 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Featured]]></category>
		<category><![CDATA[BigData]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=225</guid>
		<description><![CDATA[<p><img width="656" height="200" src="http://www.serrate.net/files/2015/12/stream_banner.png" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2015/12/stream_banner.png 656w, http://www.serrate.net/files/2015/12/stream_banner-300x91.png 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>I've been working for some years now in distributed systems and event-driven architectures, from the misunderstood <a href="http://www.serrate.net/tag/soa/" target="_blank">SOA</a> (or its refurbished version known as <a href="http://udidahan.com/2014/03/31/on-that-microservices-thing/" target="_blank">Microservices</a>) to <a href="http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/" target="_blank">Event Sourcing</a>.

Some of the concepts presented in these systems related to events like <strong>immutability</strong>, <strong>perpetuity</strong> and <strong>versioning</strong> are valid as well for stream processing. Stream processing along with batch processing is sometimes referred as Big Data.
<h3>Big Data</h3>
When we think about Big Data what it first comes to our mind is <strong>Hadoop</strong> for batch processing. Although Hadoop has a big capacity to process indecent amounts of data, it also comes with a high latency response.

Although this latency won't be a problem for a lot of use cases, it may be a problem when we need to get real (or near-real) time feedback.

That's where the <a href="http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html" target="_blank">Lambda Architecture</a> (by Nathan Marz) comes in by describing how to design a system where most of our data is processed by the batch layer but, while this process is running, we are able to process the streams coming into our system:

<a href="http://www.serrate.net/files/2015/12/Lambda_en.png" rel="attachment wp-att-228"><img class="size-full wp-image-228 aligncenter" src="http://www.serrate.net/files/2015/12/Lambda_en.png" alt="Lambda architecture" width="483" height="223" /></a>

Where we can say that:
<p style="text-align: center;"></p>
<p style="text-align: left;">[note color="#ebebeb"]<strong>Current View = Query(Batch View) + Query(Stream View)</strong>[/note]</p>

<h4>Batch Layer</h4>
The batch processing layer computes arbitrary sets of data using the entire historical data. The obvious example of batch processing is <strong>Hadoop</strong>, or to be more precise, the distributed file system <strong>HDFS</strong> and a processing tool like <strong>MapReduce</strong>, <strong>Pig</strong>…

The result of this process will be stored in a database that should support batch writes (ElephantDB, HBase) but no random writes. That makes the database architecture extremely simple by removing features like online compactation or concurrency.
<h4>Stream Layer</h4>
The stream processing layer computes data one by one giving immediate feedback. Depending on the number of events or the throughput needed we may use different technologies: <strong>Spark Streaming</strong> (although it's micro-batch the latency may be sufficient for many use cases), <strong>Storm</strong>, <strong>Samza</strong>, <strong>Flink</strong>.

The result of this process will be stored in a database that should support random writes, one option may be <strong>Cassandra</strong>.

&nbsp;

In following posts I will present concrete examples with docker images using some technologies that I've used like: Kafka, Storm, Cassandra and Druid.]]></description>
				<content:encoded><![CDATA[<p><img width="656" height="200" src="http://www.serrate.net/files/2015/12/stream_banner.png" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2015/12/stream_banner.png 656w, http://www.serrate.net/files/2015/12/stream_banner-300x91.png 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>I've been working for some years now in distributed systems and event-driven architectures, from the misunderstood <a href="http://www.serrate.net/tag/soa/" target="_blank">SOA</a> (or its refurbished version known as <a href="http://udidahan.com/2014/03/31/on-that-microservices-thing/" target="_blank">Microservices</a>) to <a href="http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/" target="_blank">Event Sourcing</a>.

Some of the concepts presented in these systems related to events like <strong>immutability</strong>, <strong>perpetuity</strong> and <strong>versioning</strong> are valid as well for stream processing. Stream processing along with batch processing is sometimes referred as Big Data.
<h3>Big Data</h3>
When we think about Big Data what it first comes to our mind is <strong>Hadoop</strong> for batch processing. Although Hadoop has a big capacity to process indecent amounts of data, it also comes with a high latency response.

Although this latency won't be a problem for a lot of use cases, it may be a problem when we need to get real (or near-real) time feedback.

That's where the <a href="http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html" target="_blank">Lambda Architecture</a> (by Nathan Marz) comes in by describing how to design a system where most of our data is processed by the batch layer but, while this process is running, we are able to process the streams coming into our system:

<a href="http://www.serrate.net/files/2015/12/Lambda_en.png" rel="attachment wp-att-228"><img class="size-full wp-image-228 aligncenter" src="http://www.serrate.net/files/2015/12/Lambda_en.png" alt="Lambda architecture" width="483" height="223" /></a>

Where we can say that:
<p style="text-align: center;"></p>
<p style="text-align: left;">[note color="#ebebeb"]<strong>Current View = Query(Batch View) + Query(Stream View)</strong>[/note]</p>

<h4>Batch Layer</h4>
The batch processing layer computes arbitrary sets of data using the entire historical data. The obvious example of batch processing is <strong>Hadoop</strong>, or to be more precise, the distributed file system <strong>HDFS</strong> and a processing tool like <strong>MapReduce</strong>, <strong>Pig</strong>…

The result of this process will be stored in a database that should support batch writes (ElephantDB, HBase) but no random writes. That makes the database architecture extremely simple by removing features like online compactation or concurrency.
<h4>Stream Layer</h4>
The stream processing layer computes data one by one giving immediate feedback. Depending on the number of events or the throughput needed we may use different technologies: <strong>Spark Streaming</strong> (although it's micro-batch the latency may be sufficient for many use cases), <strong>Storm</strong>, <strong>Samza</strong>, <strong>Flink</strong>.

The result of this process will be stored in a database that should support random writes, one option may be <strong>Cassandra</strong>.

&nbsp;

In following posts I will present concrete examples with docker images using some technologies that I've used like: Kafka, Storm, Cassandra and Druid.]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2015/12/13/big-data-streams-and-lambdas/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Speaking at DotNetSpain conference</title>
		<link>http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/</link>
		<comments>http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/#respond</comments>
		<pubDate>Tue, 17 Feb 2015 17:36:13 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[CQRS]]></category>
		<category><![CDATA[EventStore]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=214</guid>
		<description><![CDATA[<p><img width="639" height="71" src="http://www.serrate.net/files/2015/02/unnamed.jpg" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2015/02/unnamed.jpg 639w, http://www.serrate.net/files/2015/02/unnamed-300x33.jpg 300w" sizes="(max-width: 639px) 100vw, 639px" /></p>Too long without posting... Anyway, a short post to remember that I will be speaking at <a title="DotNetSpain" href="https://www.desarrollaconmicrosoft.com/Dotnetspain2015/Agenda?day=27" target="_blank">DotNetSpain</a> 2015 about Complex Event Processing, Immutability and Projections with <a title="GetEventStore" href="http://geteventstore.com" target="_blank">EventStore</a>.

So, if you are interested come and say hi!

<img class="alignleft wp-image-215" src="http://www.serrate.net/files/2015/02/sesiondotnetspain-1024x403.png" alt="" width="625" height="246" />]]></description>
				<content:encoded><![CDATA[<p><img width="639" height="71" src="http://www.serrate.net/files/2015/02/unnamed.jpg" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2015/02/unnamed.jpg 639w, http://www.serrate.net/files/2015/02/unnamed-300x33.jpg 300w" sizes="(max-width: 639px) 100vw, 639px" /></p>Too long without posting... Anyway, a short post to remember that I will be speaking at <a title="DotNetSpain" href="https://www.desarrollaconmicrosoft.com/Dotnetspain2015/Agenda?day=27" target="_blank">DotNetSpain</a> 2015 about Complex Event Processing, Immutability and Projections with <a title="GetEventStore" href="http://geteventstore.com" target="_blank">EventStore</a>.

So, if you are interested come and say hi!

<img class="alignleft wp-image-215" src="http://www.serrate.net/files/2015/02/sesiondotnetspain-1024x403.png" alt="" width="625" height="246" />]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2015/02/17/speaking-at-dotnetspain-conference/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Webcast about SOA, DDD &#038; CQRS with NServiceBus</title>
		<link>http://www.serrate.net/2012/11/05/webcast-about-soa-ddd-cqrs-with-nservicebus/</link>
		<comments>http://www.serrate.net/2012/11/05/webcast-about-soa-ddd-cqrs-with-nservicebus/#respond</comments>
		<pubDate>Mon, 05 Nov 2012 10:46:09 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[NServiceBus]]></category>
		<category><![CDATA[CQRS]]></category>
		<category><![CDATA[DDD]]></category>
		<category><![CDATA[SOA]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=194</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>I will be giving a Webcast tomorrow about SOA, DDD, CQRS with NServiceBus in Spanish. In this talk I will cover the DDD strategic design, bounded contexts and how to model domain logic through NServiceBus Sagas.</p>
<p>You can see the details in the following link:</p>
<div class="aligncenter"><a href="https://msevents.microsoft.com/CUI/EventDetail.aspx?EventID=1032535739&amp;Culture=es-ES&amp;community=0" target="_blank"><img class="aligncenter" alt="" src="https://mseventsww.microsoft.com/BannerImages/567e02dc-4da6-42a6-b02f-74f06c434dc7.jpg" /></a></div>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2012/11/05/webcast-about-soa-ddd-cqrs-with-nservicebus/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>NServiceBus: DRY with unobtrusive conventions</title>
		<link>http://www.serrate.net/2012/10/16/nservicebus-dry-with-unobtrusive-conventions/</link>
		<comments>http://www.serrate.net/2012/10/16/nservicebus-dry-with-unobtrusive-conventions/#comments</comments>
		<pubDate>Tue, 16 Oct 2012 08:20:11 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[NServiceBus]]></category>
		<category><![CDATA[SOA]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=183</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>Many times when working with NServiceBus in <a href="http://nservicebus.com/docs/Samples/UnobtrusiveModeSample.aspx" target="_blank">unobtrusive mode</a> you may feel that you are repeating the same conventions over and over again on all the endpoints.</p>
<p>The <strong>IWantToRunBeforeConfiguration</strong> interface is a great help in order to embrace the DRY principle.</p>
<p>Just define your implementation in an assembly referenced by all the endpoints:</p>
<pre class="brush: csharp; title: ; notranslate">
public class UnobtrusiveConventions : IWantToRunBeforeConfiguration
{
    public void Init()
    {
        Configure.Instance
            .DefiningCommandsAs(t =&gt; t.Namespace != null
                &amp;&amp; t.Namespace.EndsWith(&quot;Commands&quot;))
            .DefiningEventsAs(t =&gt; t.Namespace != null
                &amp;&amp; t.Namespace.EndsWith(&quot;Events&quot;))
            .DefiningMessagesAs(t =&gt; t.Namespace != null
                &amp;&amp; t.Namespace.EndsWith(&quot;Messages&quot;));
    }
}
</pre>
<p>and NServiceBus will pick this class automatically for each endpoint.</p>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2012/10/16/nservicebus-dry-with-unobtrusive-conventions/feed/</wfw:commentRss>
		<slash:comments>4</slash:comments>
		</item>
		<item>
		<title>NServiceBus Training</title>
		<link>http://www.serrate.net/2012/09/11/nservicebus-training/</link>
		<comments>http://www.serrate.net/2012/09/11/nservicebus-training/#respond</comments>
		<pubDate>Tue, 11 Sep 2012 21:28:47 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[CQRS]]></category>
		<category><![CDATA[DDD]]></category>
		<category><![CDATA[NServiceBus]]></category>
		<category><![CDATA[SOA]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=168</guid>
		<description><![CDATA[&#8230;]]></description>
				<content:encoded><![CDATA[<p>I will be giving the <a href="http://www.udidahan.com/" target="_blank"><strong>Udi Dahan</strong></a>&#8216;s Enterprise Development with <a href="http://www.nservicebus.com" target="_blank">NServiceBus </a>4-day course in Spain at:</p>
<ul>
<li><a title="Register" href="http://masterarquitecturabcn.eventbrite.com/" target="_blank">November 26. Barcelona</a></li>
<li><a title="Register" href="http://masterarquitecturamad.eventbrite.com/" target="_blank">December 10. Madrid</a></li>
</ul>
<p>&nbsp;</p>
<p>More info at: <a href="http://udidahan.com/2012/10/12/training-for-this-winter/" target="_blank">http://udidahan.com/2012/10/12/training-for-this-winter/</a></p>
]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2012/09/11/nservicebus-training/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>Cassandra on Azure CentOS VM</title>
		<link>http://www.serrate.net/2012/08/27/cassandra-on-azure-centos-vm/</link>
		<comments>http://www.serrate.net/2012/08/27/cassandra-on-azure-centos-vm/#respond</comments>
		<pubDate>Mon, 27 Aug 2012 09:36:02 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Featured]]></category>
		<category><![CDATA[NoSQL]]></category>
		<category><![CDATA[Azure]]></category>
		<category><![CDATA[BigData]]></category>
		<category><![CDATA[Cassandra]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=141</guid>
		<description><![CDATA[<p><img width="656" height="200" src="http://www.serrate.net/files/2012/08/cassandravm.png" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="Cassandra on Azure" srcset="http://www.serrate.net/files/2012/08/cassandravm.png 656w, http://www.serrate.net/files/2012/08/cassandravm-300x91.png 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>Having some fun with Cassandra lately I wanted to figure out how to setup a working environment on the new Windows Azure VM roles, so I decided to give a try and install a <strong>Cassandra cluster on CentOS</strong>.

Although it’s on Ubuntu, the following article is a good guide that helped me to configure a Linux cluster: <a href="https://www.windowsazure.com/en-us/manage/linux/other-resources/how-to-run-cassandra-with-linux/" target="_blank">https://www.windowsazure.com/en-us/manage/linux/other-resources/how-to-run-cassandra-with-linux/</a>

We create the 1<sup>st</sup> VM assigning a pem certificate in order to get access by ssh:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/1.png"><img class="size-full wp-image-144 aligncenter" style="border: 1px solid black;" title="Create VM" src="http://www.serrate.net/files/2012/08/1.png" alt="Create VM" width="500" height="340" /></a></p>
<span id="more-141"></span>and then we assign the DNS name:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/2.png"><img class="size-full wp-image-145 aligncenter" style="border: 1px solid black;" title="Assign DNS name" src="http://www.serrate.net/files/2012/08/2.png" alt="Assign DNS name" width="500" height="232" /></a></p>
We will connect the remaining VMs to the 1<sup>st</sup> one on the following step:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/3.png"><img class="aligncenter size-full wp-image-147" style="border: 1px solid black;" title="Connect existing VM" src="http://www.serrate.net/files/2012/08/3.png" alt="Connect existing VM" width="500" height="215" /></a></p>

<h2>Cassandra installation</h2>
I’ve installed Cassandra from <strong>DataStax</strong> source because I found that packages and documentation are pretty good, but you can install from the Apache repository as well.

Follow the next link for detailed instructions <a href="http://www.datastax.com/docs/1.1/install/install_rpm">http://www.datastax.com/docs/1.1/install/install_rpm</a> (just make sure to install Oracle JRE because CentOS comes with OpenJDK installed by default, and then use the <em>alternatives</em> command to make Oracle JRE the default one). You will need to open the <strong>9160</strong> TPC public port on all the VM's in the cluster (so it will be load balanced).
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/4.png"><img class="aligncenter size-full wp-image-148" style="border: 1px solid black;" title="Cassandra port" src="http://www.serrate.net/files/2012/08/4.png" alt="Cassandra port" width="500" height="107" /></a></p>
Also, you can install <strong>OpsCenter</strong> (a management and monitoring web UI tool) by following <a href="http://www.datastax.com/docs/opscenter/install/install_rhel">http://www.datastax.com/docs/opscenter/install/install_rhel</a>. Then, you will need to open the OpsCenter port:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/5.png"><img class="aligncenter size-full wp-image-150" title="OpsCenter endpoint" src="http://www.serrate.net/files/2012/08/5.png" alt="OpsCenter endpoint" width="350" height="252" /></a></p>
It looks like this:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/7.png"><img class="aligncenter size-full wp-image-152" title="OpsCenter UI" src="http://www.serrate.net/files/2012/08/7.png" alt="OpsCenter UI" width="500" height="250" /></a></p>
And the Cassandra's <strong>ring view</strong>:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/8.png"><img class="aligncenter size-medium wp-image-153" title="Ring view" src="http://www.serrate.net/files/2012/08/8-300x292.png" alt="Ring view" width="300" height="292" /></a></p>
In next posts I will cover the use of Cassandra from .NET.]]></description>
				<content:encoded><![CDATA[<p><img width="656" height="200" src="http://www.serrate.net/files/2012/08/cassandravm.png" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="Cassandra on Azure" srcset="http://www.serrate.net/files/2012/08/cassandravm.png 656w, http://www.serrate.net/files/2012/08/cassandravm-300x91.png 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>Having some fun with Cassandra lately I wanted to figure out how to setup a working environment on the new Windows Azure VM roles, so I decided to give a try and install a <strong>Cassandra cluster on CentOS</strong>.

Although it’s on Ubuntu, the following article is a good guide that helped me to configure a Linux cluster: <a href="https://www.windowsazure.com/en-us/manage/linux/other-resources/how-to-run-cassandra-with-linux/" target="_blank">https://www.windowsazure.com/en-us/manage/linux/other-resources/how-to-run-cassandra-with-linux/</a>

We create the 1<sup>st</sup> VM assigning a pem certificate in order to get access by ssh:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/1.png"><img class="size-full wp-image-144 aligncenter" style="border: 1px solid black;" title="Create VM" src="http://www.serrate.net/files/2012/08/1.png" alt="Create VM" width="500" height="340" /></a></p>
<span id="more-141"></span>and then we assign the DNS name:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/2.png"><img class="size-full wp-image-145 aligncenter" style="border: 1px solid black;" title="Assign DNS name" src="http://www.serrate.net/files/2012/08/2.png" alt="Assign DNS name" width="500" height="232" /></a></p>
We will connect the remaining VMs to the 1<sup>st</sup> one on the following step:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/3.png"><img class="aligncenter size-full wp-image-147" style="border: 1px solid black;" title="Connect existing VM" src="http://www.serrate.net/files/2012/08/3.png" alt="Connect existing VM" width="500" height="215" /></a></p>

<h2>Cassandra installation</h2>
I’ve installed Cassandra from <strong>DataStax</strong> source because I found that packages and documentation are pretty good, but you can install from the Apache repository as well.

Follow the next link for detailed instructions <a href="http://www.datastax.com/docs/1.1/install/install_rpm">http://www.datastax.com/docs/1.1/install/install_rpm</a> (just make sure to install Oracle JRE because CentOS comes with OpenJDK installed by default, and then use the <em>alternatives</em> command to make Oracle JRE the default one). You will need to open the <strong>9160</strong> TPC public port on all the VM's in the cluster (so it will be load balanced).
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/4.png"><img class="aligncenter size-full wp-image-148" style="border: 1px solid black;" title="Cassandra port" src="http://www.serrate.net/files/2012/08/4.png" alt="Cassandra port" width="500" height="107" /></a></p>
Also, you can install <strong>OpsCenter</strong> (a management and monitoring web UI tool) by following <a href="http://www.datastax.com/docs/opscenter/install/install_rhel">http://www.datastax.com/docs/opscenter/install/install_rhel</a>. Then, you will need to open the OpsCenter port:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/5.png"><img class="aligncenter size-full wp-image-150" title="OpsCenter endpoint" src="http://www.serrate.net/files/2012/08/5.png" alt="OpsCenter endpoint" width="350" height="252" /></a></p>
It looks like this:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/7.png"><img class="aligncenter size-full wp-image-152" title="OpsCenter UI" src="http://www.serrate.net/files/2012/08/7.png" alt="OpsCenter UI" width="500" height="250" /></a></p>
And the Cassandra's <strong>ring view</strong>:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/8.png"><img class="aligncenter size-medium wp-image-153" title="Ring view" src="http://www.serrate.net/files/2012/08/8-300x292.png" alt="Ring view" width="300" height="292" /></a></p>
In next posts I will cover the use of Cassandra from .NET.]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2012/08/27/cassandra-on-azure-centos-vm/feed/</wfw:commentRss>
		<slash:comments>0</slash:comments>
		</item>
		<item>
		<title>UI Composition for Services</title>
		<link>http://www.serrate.net/2012/08/22/ui-composition-for-services/</link>
		<comments>http://www.serrate.net/2012/08/22/ui-composition-for-services/#comments</comments>
		<pubDate>Wed, 22 Aug 2012 07:22:49 +0000</pubDate>
		<dc:creator><![CDATA[Marçal]]></dc:creator>
				<category><![CDATA[Architecture]]></category>
		<category><![CDATA[Featured]]></category>
		<category><![CDATA[DDD]]></category>
		<category><![CDATA[SOA]]></category>

		<guid isPermaLink="false">http://www.serrate.net/?p=15</guid>
		<description><![CDATA[<p><img width="656" height="272" src="http://www.serrate.net/files/2012/08/uicomposition.jpg" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2012/08/uicomposition.jpg 656w, http://www.serrate.net/files/2012/08/uicomposition-300x124.jpg 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>One of the most important concepts when applying either <strong>SOA</strong> or <strong>DDD</strong> is the definition of Services (or Bounded Contexts in the DDD lingo).

Each of these services will be responsible for its own <strong>data and behavior</strong> and could also own the UI components of that service.

Let’s see an example with the typical Ecommerce domain:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/Services.png"><img class="aligncenter  wp-image-24" style="border: 1px solid black;" title="Services" src="http://www.serrate.net/files/2012/08/Services.png" alt="" width="420" height="258" /></a></p>
<span id="more-15"></span>
In this case we have two services, Sales &amp; Shipping, and each of them owns its UI components that will be rendered by the UI host. That's the result:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/uicomposition.png"><img class="aligncenter size-full wp-image-27" title="Ecommerce composition" src="http://www.serrate.net/files/2012/08/uicomposition-500x468.png" alt="Ecommerce composition" width="500" height="468" /></a></p>

<h2>Composition with ASP.NET MVC</h2>
In this example, I'm using <a href="http://razorgenerator.codeplex.com/" target="_blank">Razor Generator</a> in order to let services to have their own MVC components in class libraries, which will be referenced by the central UI host:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/VSSolution.png"><img class="size-full wp-image-31 aligncenter" style="border: 1px solid black;" title="VS solution" src="http://www.serrate.net/files/2012/08/VSSolution.png" alt="VS solution" width="321" height="596" /></a></p>
The use of Razor Generator is very straightforward:
<ol>
	<li>Install RazorGenerator from VS gallery</li>
	<li>On each class library, install RazorGenerator.Mvc from Nuget</li>
	<li>Create the folder structure following MVC conventions (Controllers, Views, etc.)</li>
	<li>Add a <a href="https://github.com/mserrate/serrate.net/blob/master/UIComposition/Serrate.Sales.UI/Web.config" target="_blank">web.config</a> to get intellisense for your views</li>
	<li>Change "Custom Tool" to RazorGenerator on your views in order to precompile it</li>
</ol>
Then, decorate your actions as ChildActionOnly to behave like a widget:

[sourcecode language="csharp"]
public class FinishOrderController : Controller
{
	private readonly IBus bus;

	public FinishOrderController(IBus bus)
	{
		this.bus = bus;
	}

	[ChildActionOnly]
	public ActionResult SubmitOrCancel()
	{
		ViewBag.OrderId = TheSession.OrderId;

		return PartialView();
	}

	[HttpPost]
	public ActionResult SubmitOrder()
	{
		var cmd = new SubmitOrder()
		{
			OrderId = TheSession.OrderId
		};

		this.bus.Send(cmd);

		return RedirectToAction(&quot;Processed&quot;, &quot;Order&quot;);
	}

	[HttpPost]
	public ActionResult CancelOrder()
	{
		var cmd = new CancelOrder()
		{
			OrderId = TheSession.OrderId
		};

		this.bus.Send(cmd);

		return RedirectToAction(&quot;Cancelled&quot;, &quot;Order&quot;);
	}
}
[/sourcecode]

Take a look at the sample on github and let me know!!

<a href="https://github.com/mserrate/serrate.net/tree/master/UIComposition">https://github.com/mserrate/serrate.net/tree/master/UIComposition</a>]]></description>
				<content:encoded><![CDATA[<p><img width="656" height="272" src="http://www.serrate.net/files/2012/08/uicomposition.jpg" class="attachment-post-thumbnail size-post-thumbnail wp-post-image" alt="" srcset="http://www.serrate.net/files/2012/08/uicomposition.jpg 656w, http://www.serrate.net/files/2012/08/uicomposition-300x124.jpg 300w" sizes="(max-width: 656px) 100vw, 656px" /></p>One of the most important concepts when applying either <strong>SOA</strong> or <strong>DDD</strong> is the definition of Services (or Bounded Contexts in the DDD lingo).

Each of these services will be responsible for its own <strong>data and behavior</strong> and could also own the UI components of that service.

Let’s see an example with the typical Ecommerce domain:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/Services.png"><img class="aligncenter  wp-image-24" style="border: 1px solid black;" title="Services" src="http://www.serrate.net/files/2012/08/Services.png" alt="" width="420" height="258" /></a></p>
<span id="more-15"></span>
In this case we have two services, Sales &amp; Shipping, and each of them owns its UI components that will be rendered by the UI host. That's the result:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/uicomposition.png"><img class="aligncenter size-full wp-image-27" title="Ecommerce composition" src="http://www.serrate.net/files/2012/08/uicomposition-500x468.png" alt="Ecommerce composition" width="500" height="468" /></a></p>

<h2>Composition with ASP.NET MVC</h2>
In this example, I'm using <a href="http://razorgenerator.codeplex.com/" target="_blank">Razor Generator</a> in order to let services to have their own MVC components in class libraries, which will be referenced by the central UI host:
<p style="text-align: center;"><a href="http://www.serrate.net/files/2012/08/VSSolution.png"><img class="size-full wp-image-31 aligncenter" style="border: 1px solid black;" title="VS solution" src="http://www.serrate.net/files/2012/08/VSSolution.png" alt="VS solution" width="321" height="596" /></a></p>
The use of Razor Generator is very straightforward:
<ol>
	<li>Install RazorGenerator from VS gallery</li>
	<li>On each class library, install RazorGenerator.Mvc from Nuget</li>
	<li>Create the folder structure following MVC conventions (Controllers, Views, etc.)</li>
	<li>Add a <a href="https://github.com/mserrate/serrate.net/blob/master/UIComposition/Serrate.Sales.UI/Web.config" target="_blank">web.config</a> to get intellisense for your views</li>
	<li>Change "Custom Tool" to RazorGenerator on your views in order to precompile it</li>
</ol>
Then, decorate your actions as ChildActionOnly to behave like a widget:

[sourcecode language="csharp"]
public class FinishOrderController : Controller
{
	private readonly IBus bus;

	public FinishOrderController(IBus bus)
	{
		this.bus = bus;
	}

	[ChildActionOnly]
	public ActionResult SubmitOrCancel()
	{
		ViewBag.OrderId = TheSession.OrderId;

		return PartialView();
	}

	[HttpPost]
	public ActionResult SubmitOrder()
	{
		var cmd = new SubmitOrder()
		{
			OrderId = TheSession.OrderId
		};

		this.bus.Send(cmd);

		return RedirectToAction(&quot;Processed&quot;, &quot;Order&quot;);
	}

	[HttpPost]
	public ActionResult CancelOrder()
	{
		var cmd = new CancelOrder()
		{
			OrderId = TheSession.OrderId
		};

		this.bus.Send(cmd);

		return RedirectToAction(&quot;Cancelled&quot;, &quot;Order&quot;);
	}
}
[/sourcecode]

Take a look at the sample on github and let me know!!

<a href="https://github.com/mserrate/serrate.net/tree/master/UIComposition">https://github.com/mserrate/serrate.net/tree/master/UIComposition</a>]]></content:encoded>
			<wfw:commentRss>http://www.serrate.net/2012/08/22/ui-composition-for-services/feed/</wfw:commentRss>
		<slash:comments>1</slash:comments>
		</item>
	</channel>
</rss>
